/*
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
* Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
*/
package org.onap.policy.distributed.locking;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLTransientException;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.dbcp2.BasicDataSourceFactory;
import org.onap.policy.common.utils.network.NetworkUtil;
import org.onap.policy.drools.core.lock.AlwaysFailLock;
import org.onap.policy.drools.core.lock.Lock;
import org.onap.policy.drools.core.lock.LockCallback;
import org.onap.policy.drools.core.lock.LockImpl;
import org.onap.policy.drools.core.lock.LockState;
import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
import org.onap.policy.drools.features.PolicyEngineFeatureApi;
import org.onap.policy.drools.persistence.SystemPersistenceConstants;
import org.onap.policy.drools.system.PolicyEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Distributed implementation of the Lock Feature. Maintains locks across servers using a
* shared DB.
*
*
* Note: this implementation does not honor the waitForLocks={@code true}
* parameter.
*
*
* Additional Notes:
*
*
The owner field in the DB is not derived from the lock's owner info, but is
* instead populated with the {@link #uuidString}.
* A periodic check of the DB is made to determine if any of the locks have
* expired.
* When a lock is deserialized, it will not initially appear in this feature's map; it
* will be added to the map once free() or extend() is invoked, provided there isn't
* already an entry. In addition, it initially has the host and UUID of the feature
* instance that created it. However, as soon as doExtend() completes successfully, the
* host and UUID of the lock will be updated to reflect the values within this feature
* instance.
*
*/
public class DistributedLockManager implements PolicyResourceLockManager, PolicyEngineFeatureApi {
private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class);
private static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
private static final String LOCK_LOST_MSG = "lock lost";
private static final String NOT_LOCKED_MSG = "not locked";
@Getter(AccessLevel.PROTECTED)
@Setter(AccessLevel.PROTECTED)
private static DistributedLockManager latestInstance = null;
/**
* Name of the host on which this JVM is running.
*/
@Getter
private final String hostName;
/**
* UUID of this object.
*/
@Getter
private final String uuidString = UUID.randomUUID().toString();
/**
* Maps a resource to the lock that owns it, or is awaiting a request for it. Once a
* lock is added to the map, it remains in the map until the lock is lost or until the
* unlock request completes.
*/
private final Map resource2lock = new ConcurrentHashMap<>();
/**
* Engine with which this manager is associated.
*/
private PolicyEngine engine;
/**
* Feature properties.
*/
private DistributedLockProperties featProps;
/**
* Thread pool used to check for lock expiration and to notify owners when locks are
* granted or lost.
*/
private ScheduledExecutorService exsvc = null;
/**
* Data source used to connect to the DB.
*/
private BasicDataSource dataSource = null;
/**
* Constructs the object.
*/
public DistributedLockManager() {
this.hostName = NetworkUtil.getHostname();
}
@Override
public int getSequenceNumber() {
return 1000;
}
@Override
public boolean isAlive() {
return (exsvc != null);
}
@Override
public boolean start() {
// handled via engine API
return true;
}
@Override
public boolean stop() {
// handled via engine API
return true;
}
@Override
public void shutdown() {
// handled via engine API
}
@Override
public boolean isLocked() {
return false;
}
@Override
public boolean lock() {
return true;
}
@Override
public boolean unlock() {
return true;
}
@Override
public PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) {
try {
this.engine = engine;
this.featProps = new DistributedLockProperties(getProperties(CONFIGURATION_PROPERTIES_NAME));
this.exsvc = getThreadPool();
this.dataSource = makeDataSource();
return this;
} catch (Exception e) {
throw new DistributedLockManagerException(e);
}
}
@Override
public boolean afterStart(PolicyEngine engine) {
try {
exsvc.execute(this::deleteExpiredDbLocks);
exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
setLatestInstance(this);
} catch (Exception e) {
throw new DistributedLockManagerException(e);
}
return false;
}
/**
* Make data source.
*
* @return a new, pooled data source
* @throws Exception exception
*/
protected BasicDataSource makeDataSource() throws Exception {
Properties props = new Properties();
props.put("driverClassName", featProps.getDbDriver());
props.put("url", featProps.getDbUrl());
props.put("username", featProps.getDbUser());
props.put("password", featProps.getDbPwd());
props.put("testOnBorrow", "true");
props.put("poolPreparedStatements", "true");
// additional properties are listed in the GenericObjectPool API
return BasicDataSourceFactory.createDataSource(props);
}
/**
* Deletes expired locks from the DB.
*/
private void deleteExpiredDbLocks() {
logger.info("deleting all expired locks from the DB");
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn
.prepareStatement("DELETE FROM pooling.locks WHERE expirationTime <= now()")) {
int ndel = stmt.executeUpdate();
logger.info("deleted {} expired locks from the DB", ndel);
} catch (SQLException e) {
logger.warn("failed to delete expired locks from the DB", e);
}
}
/**
* Closes the data source. Does not invoke any lock call-backs.
*/
@Override
public boolean afterStop(PolicyEngine engine) {
exsvc = null;
closeDataSource();
return false;
}
/**
* Closes {@link #dataSource} and sets it to {@code null}.
*/
private void closeDataSource() {
try {
if (dataSource != null) {
dataSource.close();
}
} catch (SQLException e) {
logger.error("cannot close the distributed locking DB", e);
}
dataSource = null;
}
@Override
public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback,
boolean waitForLock) {
if (latestInstance != this) {
AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback);
lock.notifyUnavailable();
return lock;
}
DistributedLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback);
DistributedLock existingLock = resource2lock.putIfAbsent(resourceId, lock);
// do these outside of compute() to avoid blocking other map operations
if (existingLock == null) {
logger.debug("added lock to map {}", lock);
lock.scheduleRequest(lock::doLock);
} else {
lock.deny("resource is busy", true);
}
return lock;
}
/**
* Checks for expired locks.
*/
private void checkExpired() {
try {
logger.info("checking for expired locks");
Set expiredIds = new HashSet<>(resource2lock.keySet());
identifyDbLocks(expiredIds);
expireLocks(expiredIds);
exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
} catch (RejectedExecutionException e) {
logger.warn("thread pool is no longer accepting requests", e);
} catch (SQLException | RuntimeException e) {
logger.error("error checking expired locks", e);
exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS);
}
logger.info("done checking for expired locks");
}
/**
* Identifies this feature instance's locks that the DB indicates are still active.
*
* @param expiredIds IDs of resources that have expired locks. If a resource is still
* locked, it's ID is removed from this set
* @throws SQLException if a DB error occurs
*/
private void identifyDbLocks(Set expiredIds) throws SQLException {
/*
* We could query for host and UUIDs that actually appear within the locks, but
* those might change while the query is running so no real value in doing that.
* On the other hand, there's only a brief instance between the time a
* deserialized lock is added to this feature instance and its doExtend() method
* updates its host and UUID to match this feature instance. If this happens to
* run during that brief instance, then the lock will be lost and the callback
* invoked. It isn't worth complicating this code further to handle those highly
* unlikely cases.
*/
// @formatter:off
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"SELECT resourceId FROM pooling.locks WHERE host=? AND owner=? AND expirationTime > now()")) {
// @formatter:on
stmt.setString(1, hostName);
stmt.setString(2, uuidString);
try (ResultSet resultSet = stmt.executeQuery()) {
while (resultSet.next()) {
String resourceId = resultSet.getString(1);
// we have now seen this resource id
expiredIds.remove(resourceId);
}
}
}
}
/**
* Expires locks for the resources that no longer appear within the DB.
*
* @param expiredIds IDs of resources that have expired locks
*/
private void expireLocks(Set expiredIds) {
for (String resourceId : expiredIds) {
AtomicReference lockref = new AtomicReference<>(null);
resource2lock.computeIfPresent(resourceId, (key, lock) -> {
if (lock.isActive()) {
// it thinks it's active, but it isn't - remove from the map
lockref.set(lock);
return null;
}
return lock;
});
DistributedLock lock = lockref.get();
if (lock != null) {
logger.debug("removed lock from map {}", lock);
lock.deny(LOCK_LOST_MSG, false);
}
}
}
/**
* Distributed Lock implementation.
*/
public static class DistributedLock extends LockImpl {
private static final String SQL_FAILED_MSG = "request failed for lock: {}";
private static final long serialVersionUID = 1L;
/**
* Feature containing this lock. May be {@code null} until the feature is
* identified. Note: this can only be null if the lock has been de-serialized.
*/
private transient DistributedLockManager feature;
/**
* Host name from the feature instance that created this object. Replaced with the
* host name from the current feature instance whenever the lock is successfully
* extended.
*/
private String hostName;
/**
* UUID string from the feature instance that created this object. Replaced with
* the UUID string from the current feature instance whenever the lock is
* successfully extended.
*/
private String uuidString;
/**
* {@code True} if the lock is busy making a request, {@code false} otherwise.
*/
private transient boolean busy = false;
/**
* Request to be performed.
*/
private transient RunnableWithEx request = null;
/**
* Number of times we've retried a request.
*/
private transient int nretries = 0;
/**
* Constructs the object.
*/
public DistributedLock() {
this.hostName = "";
this.uuidString = "";
}
/**
* Constructs the object.
*
* @param state initial state of the lock
* @param resourceId identifier of the resource to be locked
* @param ownerKey information identifying the owner requesting the lock
* @param holdSec amount of time, in seconds, for which the lock should be held,
* after which it will automatically be released
* @param callback callback to be invoked once the lock is granted, or
* subsequently lost; must not be {@code null}
* @param feature feature containing this lock
*/
public DistributedLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback,
DistributedLockManager feature) {
super(state, resourceId, ownerKey, holdSec, callback);
this.feature = feature;
this.hostName = feature.hostName;
this.uuidString = feature.uuidString;
}
/**
* Grants this lock. The notification is always invoked via the
* foreground thread.
*/
protected void grant() {
synchronized (this) {
if (isUnavailable()) {
return;
}
setState(LockState.ACTIVE);
}
logger.info("lock granted: {}", this);
notifyAvailable();
}
/**
* Permanently denies this lock.
*
* @param reason the reason the lock was denied
* @param foreground {@code true} if the callback can be invoked in the current
* (i.e., foreground) thread, {@code false} if it should be invoked via the
* executor
*/
protected void deny(String reason, boolean foreground) {
synchronized (this) {
setState(LockState.UNAVAILABLE);
}
logger.info("{}: {}", reason, this);
if (feature == null || foreground) {
notifyUnavailable();
} else {
feature.exsvc.execute(this::notifyUnavailable);
}
}
@Override
public boolean free() {
// do a quick check of the state
if (isUnavailable()) {
return false;
}
logger.info("releasing lock: {}", this);
if (!attachFeature()) {
setState(LockState.UNAVAILABLE);
return false;
}
AtomicBoolean result = new AtomicBoolean(false);
feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
if (curlock == this && !isUnavailable()) {
// this lock was the owner
result.set(true);
setState(LockState.UNAVAILABLE);
/*
* NOTE: do NOT return null; curlock must remain until doUnlock
* completes.
*/
}
return curlock;
});
if (result.get()) {
scheduleRequest(this::doUnlock);
return true;
}
return false;
}
@Override
public void extend(int holdSec, LockCallback callback) {
if (holdSec < 0) {
throw new IllegalArgumentException("holdSec is negative");
}
setHoldSec(holdSec);
setCallback(callback);
// do a quick check of the state
if (isUnavailable() || !attachFeature()) {
deny(LOCK_LOST_MSG, true);
return;
}
AtomicBoolean success = new AtomicBoolean(false);
feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
if (curlock == this && !isUnavailable()) {
success.set(true);
setState(LockState.WAITING);
}
// note: leave it in the map until doUnlock() removes it
return curlock;
});
if (success.get()) {
scheduleRequest(this::doExtend);
} else {
deny(NOT_LOCKED_MSG, true);
}
}
/**
* Attaches to the feature instance, if not already attached.
*
* @return {@code true} if the lock is now attached to a feature, {@code false}
* otherwise
*/
private synchronized boolean attachFeature() {
if (feature != null) {
// already attached
return true;
}
feature = latestInstance;
if (feature == null) {
logger.warn("no feature yet for {}", this);
return false;
}
// put this lock into the map
feature.resource2lock.putIfAbsent(getResourceId(), this);
return true;
}
/**
* Schedules a request for execution.
*
* @param schedreq the request that should be scheduled
*/
private synchronized void scheduleRequest(RunnableWithEx schedreq) {
logger.debug("schedule lock action {}", this);
nretries = 0;
request = schedreq;
feature.exsvc.execute(this::doRequest);
}
/**
* Reschedules a request for execution, if there is not already a request in the
* queue, and if the retry count has not been exhausted.
*
* @param req request to be rescheduled
*/
private void rescheduleRequest(RunnableWithEx req) {
synchronized (this) {
if (request != null) {
// a new request has already been scheduled - it supersedes "req"
logger.debug("not rescheduling lock action {}", this);
return;
}
if (nretries++ < feature.featProps.getMaxRetries()) {
logger.debug("reschedule for {}s {}", feature.featProps.getRetrySec(), this);
request = req;
feature.exsvc.schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS);
return;
}
}
logger.warn("retry count {} exhausted for lock: {}", feature.featProps.getMaxRetries(), this);
removeFromMap();
}
/**
* Gets, and removes, the next request from the queue. Clears {@link #busy} if
* there are no more requests in the queue.
*
* @param prevReq the previous request that was just run
*
* @return the next request, or {@code null} if the queue is empty
*/
private synchronized RunnableWithEx getNextRequest(RunnableWithEx prevReq) {
if (request == null || request == prevReq) {
logger.debug("no more requests for {}", this);
busy = false;
return null;
}
RunnableWithEx req = request;
request = null;
return req;
}
/**
* Executes the current request, if none are currently executing.
*/
private void doRequest() {
synchronized (this) {
if (busy) {
// another thread is already processing the request(s)
return;
}
busy = true;
}
/*
* There is a race condition wherein this thread could invoke run() while the
* next scheduled thread checks the busy flag and finds that work is being
* done and returns, leaving the next work item in "request". In that case,
* the next work item may never be executed, thus we use a loop here, instead
* of just executing a single request.
*/
RunnableWithEx req = null;
while ((req = getNextRequest(req)) != null) {
if (feature.resource2lock.get(getResourceId()) != this) {
/*
* no longer in the map - don't apply the action, as it may interfere
* with any newly added Lock object
*/
logger.debug("discard lock action {}", this);
synchronized (this) {
busy = false;
}
return;
}
try {
/*
* Run the request. If it throws an exception, then it will be
* rescheduled for execution a little later.
*/
req.run();
} catch (SQLException e) {
logger.warn(SQL_FAILED_MSG, this, e);
if (e.getCause() instanceof SQLTransientException) {
// retry the request a little later
rescheduleRequest(req);
} else {
removeFromMap();
}
} catch (RuntimeException e) {
logger.warn(SQL_FAILED_MSG, this, e);
removeFromMap();
}
}
}
/**
* Attempts to add a lock to the DB. Generates a callback, indicating success or
* failure.
*
* @throws SQLException if a DB error occurs
*/
private void doLock() throws SQLException {
if (!isWaiting()) {
logger.debug("discard doLock {}", this);
return;
}
/*
* There is a small window in which a client could invoke free() before the DB
* is updated. In that case, doUnlock will be added to the queue to run after
* this, which will delete the record, as desired. In addition, grant() will
* not do anything, because the lock state will have been set to UNAVAILABLE
* by free().
*/
logger.debug("doLock {}", this);
try (Connection conn = feature.dataSource.getConnection()) {
boolean success = false;
try {
success = doDbInsert(conn);
} catch (SQLException e) {
logger.info("failed to insert lock record - attempting update: {}", this, e);
success = doDbUpdate(conn);
}
if (success) {
grant();
return;
}
}
removeFromMap();
}
/**
* Attempts to remove a lock from the DB. Does not generate a callback if
* it fails, as this should only be executed in response to a call to
* {@link #free()}.
*
* @throws SQLException if a DB error occurs
*/
private void doUnlock() throws SQLException {
logger.debug("unlock {}", this);
try (Connection conn = feature.dataSource.getConnection()) {
doDbDelete(conn);
}
removeFromMap();
}
/**
* Attempts to extend a lock in the DB. Generates a callback, indicating success
* or failure.
*
* @throws SQLException if a DB error occurs
*/
private void doExtend() throws SQLException {
if (!isWaiting()) {
logger.debug("discard doExtend {}", this);
return;
}
/*
* There is a small window in which a client could invoke free() before the DB
* is updated. In that case, doUnlock will be added to the queue to run after
* this, which will delete the record, as desired. In addition, grant() will
* not do anything, because the lock state will have been set to UNAVAILABLE
* by free().
*/
logger.debug("doExtend {}", this);
try (Connection conn = feature.dataSource.getConnection()) {
/*
* invoker may have called extend() before free() had a chance to insert
* the record, thus we have to try to insert, if the update fails
*/
if (doDbUpdate(conn) || doDbInsert(conn)) {
grant();
return;
}
}
removeFromMap();
}
/**
* Inserts the lock into the DB.
*
* @param conn DB connection
* @return {@code true} if a record was successfully inserted, {@code false}
* otherwise
* @throws SQLException if a DB error occurs
*/
protected boolean doDbInsert(Connection conn) throws SQLException {
logger.debug("insert lock record {}", this);
try (PreparedStatement stmt =
conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
+ "values (?, ?, ?, timestampadd(second, ?, now()))")) {
stmt.setString(1, getResourceId());
stmt.setString(2, feature.hostName);
stmt.setString(3, feature.uuidString);
stmt.setInt(4, getHoldSec());
stmt.executeUpdate();
this.hostName = feature.hostName;
this.uuidString = feature.uuidString;
return true;
}
}
/**
* Updates the lock in the DB.
*
* @param conn DB connection
* @return {@code true} if a record was successfully updated, {@code false}
* otherwise
* @throws SQLException if a DB error occurs
*/
protected boolean doDbUpdate(Connection conn) throws SQLException {
logger.debug("update lock record {}", this);
try (PreparedStatement stmt =
conn.prepareStatement("UPDATE pooling.locks SET resourceId=?, host=?, owner=?,"
+ " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?"
+ " AND ((host=? AND owner=?) OR expirationTime < now())")) {
stmt.setString(1, getResourceId());
stmt.setString(2, feature.hostName);
stmt.setString(3, feature.uuidString);
stmt.setInt(4, getHoldSec());
stmt.setString(5, getResourceId());
stmt.setString(6, this.hostName);
stmt.setString(7, this.uuidString);
if (stmt.executeUpdate() != 1) {
return false;
}
this.hostName = feature.hostName;
this.uuidString = feature.uuidString;
return true;
}
}
/**
* Deletes the lock from the DB.
*
* @param conn DB connection
* @throws SQLException if a DB error occurs
*/
protected void doDbDelete(Connection conn) throws SQLException {
logger.debug("delete lock record {}", this);
try (PreparedStatement stmt =
conn.prepareStatement("DELETE pooling.locks WHERE resourceId=? AND host=? AND owner=?")) {
stmt.setString(1, getResourceId());
stmt.setString(2, this.hostName);
stmt.setString(3, this.uuidString);
stmt.executeUpdate();
}
}
/**
* Removes the lock from the map, and sends a notification using the current
* thread.
*/
private void removeFromMap() {
logger.debug("remove lock from map {}", this);
feature.resource2lock.remove(getResourceId(), this);
synchronized (this) {
if (!isUnavailable()) {
deny(LOCK_LOST_MSG, true);
}
}
}
@Override
public String toString() {
return "DistributedLock [state=" + getState() + ", resourceId=" + getResourceId() + ", ownerKey="
+ getOwnerKey() + ", holdSec=" + getHoldSec() + ", hostName=" + hostName + ", uuidString="
+ uuidString + "]";
}
}
@FunctionalInterface
private static interface RunnableWithEx {
void run() throws SQLException;
}
// these may be overridden by junit tests
protected Properties getProperties(String fileName) {
return SystemPersistenceConstants.getManager().getProperties(fileName);
}
protected ScheduledExecutorService getThreadPool() {
return engine.getExecutorService();
}
protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
LockCallback callback) {
return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, this);
}
}