diff options
Diffstat (limited to 'feature-distributed-locking/src/main/java')
-rw-r--r-- | feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java | 936 | ||||
-rw-r--r-- | feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManagerException.java (renamed from feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java) | 12 | ||||
-rw-r--r-- | feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java | 136 | ||||
-rw-r--r-- | feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java | 179 | ||||
-rw-r--r-- | feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java | 127 | ||||
-rw-r--r-- | feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java | 282 |
6 files changed, 1078 insertions, 594 deletions
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java new file mode 100644 index 00000000..523c0d93 --- /dev/null +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java @@ -0,0 +1,936 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.distributed.locking; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.Setter; +import org.apache.commons.dbcp2.BasicDataSource; +import org.apache.commons.dbcp2.BasicDataSourceFactory; +import org.onap.policy.common.utils.network.NetworkUtil; +import org.onap.policy.drools.core.lock.AlwaysFailLock; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.LockImpl; +import org.onap.policy.drools.core.lock.LockState; +import org.onap.policy.drools.core.lock.PolicyResourceLockManager; +import org.onap.policy.drools.features.PolicyEngineFeatureApi; +import org.onap.policy.drools.persistence.SystemPersistenceConstants; +import org.onap.policy.drools.system.PolicyEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Distributed implementation of the Lock Feature. Maintains locks across servers using a + * shared DB. + * + * <p/> + * Note: this implementation does <i>not</i> honor the waitForLocks={@code true} + * parameter. + * + * <p/> + * Additional Notes: + * <dl> + * <li>The <i>owner</i> field in the DB is not derived from the lock's owner info, but is + * instead populated with the {@link #uuidString}.</li> + * <li>A periodic check of the DB is made to determine if any of the locks have + * expired.</li> + * <li>When a lock is deserialized, it will not initially appear in this feature's map; it + * will be added to the map once free() or extend() is invoked, provided there isn't + * already an entry. In addition, it initially has the host and UUID of the feature + * instance that created it. However, as soon as doExtend() completes successfully, the + * host and UUID of the lock will be updated to reflect the values within this feature + * instance.</li> + * </dl> + */ +public class DistributedLockManager + implements PolicyResourceLockManager, PolicyEngineFeatureApi { + + private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class); + + private static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking"; + private static final String LOCK_LOST_MSG = "lock lost"; + private static final String NOT_LOCKED_MSG = "not locked"; + + @Getter(AccessLevel.PROTECTED) + @Setter(AccessLevel.PROTECTED) + private static DistributedLockManager latestInstance = null; + + + /** + * Name of the host on which this JVM is running. + */ + @Getter + private final String hostName; + + /** + * UUID of this object. + */ + @Getter + private final String uuidString = UUID.randomUUID().toString(); + + /** + * Maps a resource to the lock that owns it, or is awaiting a request for it. Once a + * lock is added to the map, it remains in the map until the lock is lost or until the + * unlock request completes. + */ + private final Map<String, DistributedLock> resource2lock = new ConcurrentHashMap<>(); + + /** + * Engine with which this manager is associated. + */ + private PolicyEngine engine; + + /** + * Feature properties. + */ + private DistributedLockProperties featProps; + + /** + * Thread pool used to check for lock expiration and to notify owners when locks are + * granted or lost. + */ + private ScheduledExecutorService exsvc = null; + + /** + * Data source used to connect to the DB. + */ + private BasicDataSource dataSource = null; + + + /** + * Constructs the object. + */ + public DistributedLockManager() { + this.hostName = NetworkUtil.getHostname(); + } + + @Override + public int getSequenceNumber() { + return 1000; + } + + @Override + public boolean isAlive() { + return (exsvc != null); + } + + @Override + public boolean start() { + // handled via engine API + return true; + } + + @Override + public boolean stop() { + // handled via engine API + return true; + } + + @Override + public void shutdown() { + // handled via engine API + } + + @Override + public boolean isLocked() { + return false; + } + + @Override + public boolean lock() { + return true; + } + + @Override + public boolean unlock() { + return true; + } + + @Override + public PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) { + + try { + this.engine = engine; + this.featProps = new DistributedLockProperties(getProperties(CONFIGURATION_PROPERTIES_NAME)); + this.exsvc = getThreadPool(); + this.dataSource = makeDataSource(); + + return this; + + } catch (Exception e) { + throw new DistributedLockManagerException(e); + } + } + + @Override + public boolean afterStart(PolicyEngine engine) { + + try { + exsvc.execute(this::deleteExpiredDbLocks); + exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS); + + setLatestInstance(this); + + } catch (Exception e) { + throw new DistributedLockManagerException(e); + } + + return false; + } + + /** + * Make data source. + * + * @return a new, pooled data source + * @throws Exception exception + */ + protected BasicDataSource makeDataSource() throws Exception { + Properties props = new Properties(); + props.put("driverClassName", featProps.getDbDriver()); + props.put("url", featProps.getDbUrl()); + props.put("username", featProps.getDbUser()); + props.put("password", featProps.getDbPwd()); + props.put("testOnBorrow", "true"); + props.put("poolPreparedStatements", "true"); + + // additional properties are listed in the GenericObjectPool API + + return BasicDataSourceFactory.createDataSource(props); + } + + /** + * Deletes expired locks from the DB. + */ + private void deleteExpiredDbLocks() { + logger.info("deleting all expired locks from the DB"); + + try (Connection conn = dataSource.getConnection(); + PreparedStatement stmt = conn + .prepareStatement("DELETE FROM pooling.locks WHERE expirationTime <= now()")) { + + int ndel = stmt.executeUpdate(); + logger.info("deleted {} expired locks from the DB", ndel); + + } catch (SQLException e) { + logger.warn("failed to delete expired locks from the DB", e); + } + } + + /** + * Closes the data source. Does <i>not</i> invoke any lock call-backs. + */ + @Override + public boolean afterStop(PolicyEngine engine) { + exsvc = null; + closeDataSource(); + return false; + } + + /** + * Closes {@link #dataSource} and sets it to {@code null}. + */ + private void closeDataSource() { + try { + if (dataSource != null) { + dataSource.close(); + } + + } catch (SQLException e) { + logger.error("cannot close the distributed locking DB", e); + } + + dataSource = null; + } + + @Override + public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback, + boolean waitForLock) { + + if (latestInstance != this) { + AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback); + lock.notifyUnavailable(); + return lock; + } + + DistributedLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback); + + DistributedLock existingLock = resource2lock.putIfAbsent(resourceId, lock); + + // do these outside of compute() to avoid blocking other map operations + if (existingLock == null) { + logger.debug("added lock to map {}", lock); + lock.scheduleRequest(lock::doLock); + } else { + lock.deny("resource is busy", true); + } + + return lock; + } + + /** + * Checks for expired locks. + */ + private void checkExpired() { + + try { + logger.info("checking for expired locks"); + Set<String> expiredIds = new HashSet<>(resource2lock.keySet()); + identifyDbLocks(expiredIds); + expireLocks(expiredIds); + + exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS); + + } catch (RejectedExecutionException e) { + logger.warn("thread pool is no longer accepting requests", e); + + } catch (SQLException | RuntimeException e) { + logger.error("error checking expired locks", e); + exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS); + } + + logger.info("done checking for expired locks"); + } + + /** + * Identifies this feature instance's locks that the DB indicates are still active. + * + * @param expiredIds IDs of resources that have expired locks. If a resource is still + * locked, it's ID is removed from this set + * @throws SQLException if a DB error occurs + */ + private void identifyDbLocks(Set<String> expiredIds) throws SQLException { + /* + * We could query for host and UUIDs that actually appear within the locks, but + * those might change while the query is running so no real value in doing that. + * On the other hand, there's only a brief instance between the time a + * deserialized lock is added to this feature instance and its doExtend() method + * updates its host and UUID to match this feature instance. If this happens to + * run during that brief instance, then the lock will be lost and the callback + * invoked. It isn't worth complicating this code further to handle those highly + * unlikely cases. + */ + + // @formatter:off + try (Connection conn = dataSource.getConnection(); + PreparedStatement stmt = conn.prepareStatement( + "SELECT resourceId FROM pooling.locks WHERE host=? AND owner=? AND expirationTime > now()")) { + // @formatter:on + + stmt.setString(1, hostName); + stmt.setString(2, uuidString); + + try (ResultSet resultSet = stmt.executeQuery()) { + while (resultSet.next()) { + String resourceId = resultSet.getString(1); + + // we have now seen this resource id + expiredIds.remove(resourceId); + } + } + } + } + + /** + * Expires locks for the resources that no longer appear within the DB. + * + * @param expiredIds IDs of resources that have expired locks + */ + private void expireLocks(Set<String> expiredIds) { + for (String resourceId : expiredIds) { + AtomicReference<DistributedLock> lockref = new AtomicReference<>(null); + + resource2lock.computeIfPresent(resourceId, (key, lock) -> { + if (lock.isActive()) { + // it thinks it's active, but it isn't - remove from the map + lockref.set(lock); + return null; + } + + return lock; + }); + + DistributedLock lock = lockref.get(); + if (lock != null) { + logger.debug("removed lock from map {}", lock); + lock.deny(LOCK_LOST_MSG, false); + } + } + } + + /** + * Distributed Lock implementation. + */ + public static class DistributedLock extends LockImpl { + private static final long serialVersionUID = 1L; + + /** + * Feature containing this lock. May be {@code null} until the feature is + * identified. Note: this can only be null if the lock has been de-serialized. + */ + private transient DistributedLockManager feature; + + /** + * Host name from the feature instance that created this object. Replaced with the + * host name from the current feature instance whenever the lock is successfully + * extended. + */ + private String hostName; + + /** + * UUID string from the feature instance that created this object. Replaced with + * the UUID string from the current feature instance whenever the lock is + * successfully extended. + */ + private String uuidString; + + /** + * {@code True} if the lock is busy making a request, {@code false} otherwise. + */ + private transient boolean busy = false; + + /** + * Request to be performed. + */ + private transient RunnableWithEx request = null; + + /** + * Number of times we've retried a request. + */ + private transient int nretries = 0; + + /** + * Constructs the object. + */ + public DistributedLock() { + this.hostName = ""; + this.uuidString = ""; + } + + /** + * Constructs the object. + * + * @param state initial state of the lock + * @param resourceId identifier of the resource to be locked + * @param ownerKey information identifying the owner requesting the lock + * @param holdSec amount of time, in seconds, for which the lock should be held, + * after which it will automatically be released + * @param callback callback to be invoked once the lock is granted, or + * subsequently lost; must not be {@code null} + * @param feature feature containing this lock + */ + public DistributedLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback, + DistributedLockManager feature) { + super(state, resourceId, ownerKey, holdSec, callback); + + this.feature = feature; + this.hostName = feature.hostName; + this.uuidString = feature.uuidString; + } + + /** + * Grants this lock. The notification is <i>always</i> invoked via the + * <i>foreground</i> thread. + */ + protected void grant() { + synchronized (this) { + if (isUnavailable()) { + return; + } + + setState(LockState.ACTIVE); + } + + logger.info("lock granted: {}", this); + + notifyAvailable(); + } + + /** + * Permanently denies this lock. + * + * @param reason the reason the lock was denied + * @param foreground {@code true} if the callback can be invoked in the current + * (i.e., foreground) thread, {@code false} if it should be invoked via the + * executor + */ + protected void deny(String reason, boolean foreground) { + synchronized (this) { + setState(LockState.UNAVAILABLE); + } + + logger.info("{}: {}", reason, this); + + if (feature == null || foreground) { + notifyUnavailable(); + + } else { + feature.exsvc.execute(this::notifyUnavailable); + } + } + + @Override + public boolean free() { + // do a quick check of the state + if (isUnavailable()) { + return false; + } + + logger.info("releasing lock: {}", this); + + if (!attachFeature()) { + setState(LockState.UNAVAILABLE); + return false; + } + + AtomicBoolean result = new AtomicBoolean(false); + + feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> { + if (curlock == this && !isUnavailable()) { + // this lock was the owner + result.set(true); + setState(LockState.UNAVAILABLE); + + /* + * NOTE: do NOT return null; curlock must remain until doUnlock + * completes. + */ + } + + return curlock; + }); + + if (result.get()) { + scheduleRequest(this::doUnlock); + return true; + } + + return false; + } + + @Override + public void extend(int holdSec, LockCallback callback) { + if (holdSec < 0) { + throw new IllegalArgumentException("holdSec is negative"); + } + + setHoldSec(holdSec); + setCallback(callback); + + // do a quick check of the state + if (isUnavailable() || !attachFeature()) { + deny(LOCK_LOST_MSG, true); + return; + } + + AtomicBoolean success = new AtomicBoolean(false); + + feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> { + if (curlock == this && !isUnavailable()) { + success.set(true); + setState(LockState.WAITING); + } + + // note: leave it in the map until doUnlock() removes it + + return curlock; + }); + + if (success.get()) { + scheduleRequest(this::doExtend); + + } else { + deny(NOT_LOCKED_MSG, true); + } + } + + /** + * Attaches to the feature instance, if not already attached. + * + * @return {@code true} if the lock is now attached to a feature, {@code false} + * otherwise + */ + private synchronized boolean attachFeature() { + if (feature != null) { + // already attached + return true; + } + + feature = latestInstance; + if (feature == null) { + logger.warn("no feature yet for {}", this); + return false; + } + + // put this lock into the map + feature.resource2lock.putIfAbsent(getResourceId(), this); + + return true; + } + + /** + * Schedules a request for execution. + * + * @param schedreq the request that should be scheduled + */ + private synchronized void scheduleRequest(RunnableWithEx schedreq) { + logger.debug("schedule lock action {}", this); + nretries = 0; + request = schedreq; + feature.exsvc.execute(this::doRequest); + } + + /** + * Reschedules a request for execution, if there is not already a request in the + * queue, and if the retry count has not been exhausted. + * + * @param req request to be rescheduled + */ + private void rescheduleRequest(RunnableWithEx req) { + synchronized (this) { + if (request != null) { + // a new request has already been scheduled - it supersedes "req" + logger.debug("not rescheduling lock action {}", this); + return; + } + + if (nretries++ < feature.featProps.getMaxRetries()) { + logger.debug("reschedule for {}s {}", feature.featProps.getRetrySec(), this); + request = req; + feature.exsvc.schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS); + return; + } + } + + logger.warn("retry count {} exhausted for lock: {}", feature.featProps.getMaxRetries(), this); + removeFromMap(); + } + + /** + * Gets, and removes, the next request from the queue. Clears {@link #busy} if + * there are no more requests in the queue. + * + * @param prevReq the previous request that was just run + * + * @return the next request, or {@code null} if the queue is empty + */ + private synchronized RunnableWithEx getNextRequest(RunnableWithEx prevReq) { + if (request == null || request == prevReq) { + logger.debug("no more requests for {}", this); + busy = false; + return null; + } + + RunnableWithEx req = request; + request = null; + + return req; + } + + /** + * Executes the current request, if none are currently executing. + */ + private void doRequest() { + synchronized (this) { + if (busy) { + // another thread is already processing the request(s) + return; + } + busy = true; + } + + /* + * There is a race condition wherein this thread could invoke run() while the + * next scheduled thread checks the busy flag and finds that work is being + * done and returns, leaving the next work item in "request". In that case, + * the next work item may never be executed, thus we use a loop here, instead + * of just executing a single request. + */ + RunnableWithEx req = null; + while ((req = getNextRequest(req)) != null) { + if (feature.resource2lock.get(getResourceId()) != this) { + /* + * no longer in the map - don't apply the action, as it may interfere + * with any newly added Lock object + */ + logger.debug("discard lock action {}", this); + synchronized (this) { + busy = false; + } + return; + } + + try { + /* + * Run the request. If it throws an exception, then it will be + * rescheduled for execution a little later. + */ + req.run(); + + } catch (SQLException e) { + logger.warn("request failed for lock: {}", this, e); + + if (feature.featProps.isTransient(e.getErrorCode())) { + // retry the request a little later + rescheduleRequest(req); + } else { + removeFromMap(); + } + + } catch (RuntimeException e) { + logger.warn("request failed for lock: {}", this, e); + removeFromMap(); + } + } + } + + /** + * Attempts to add a lock to the DB. Generates a callback, indicating success or + * failure. + * + * @throws SQLException if a DB error occurs + */ + private void doLock() throws SQLException { + if (!isWaiting()) { + logger.debug("discard doLock {}", this); + return; + } + + /* + * There is a small window in which a client could invoke free() before the DB + * is updated. In that case, doUnlock will be added to the queue to run after + * this, which will delete the record, as desired. In addition, grant() will + * not do anything, because the lock state will have been set to UNAVAILABLE + * by free(). + */ + + logger.debug("doLock {}", this); + try (Connection conn = feature.dataSource.getConnection()) { + boolean success = false; + try { + success = doDbInsert(conn); + + } catch (SQLException e) { + logger.info("failed to insert lock record - attempting update: {}", this, e); + success = doDbUpdate(conn); + } + + if (success) { + grant(); + return; + } + } + + removeFromMap(); + } + + /** + * Attempts to remove a lock from the DB. Does <i>not</i> generate a callback if + * it fails, as this should only be executed in response to a call to + * {@link #free()}. + * + * @throws SQLException if a DB error occurs + */ + private void doUnlock() throws SQLException { + logger.debug("unlock {}", this); + try (Connection conn = feature.dataSource.getConnection()) { + doDbDelete(conn); + } + + removeFromMap(); + } + + /** + * Attempts to extend a lock in the DB. Generates a callback, indicating success + * or failure. + * + * @throws SQLException if a DB error occurs + */ + private void doExtend() throws SQLException { + if (!isWaiting()) { + logger.debug("discard doExtend {}", this); + return; + } + + /* + * There is a small window in which a client could invoke free() before the DB + * is updated. In that case, doUnlock will be added to the queue to run after + * this, which will delete the record, as desired. In addition, grant() will + * not do anything, because the lock state will have been set to UNAVAILABLE + * by free(). + */ + + logger.debug("doExtend {}", this); + try (Connection conn = feature.dataSource.getConnection()) { + /* + * invoker may have called extend() before free() had a chance to insert + * the record, thus we have to try to insert, if the update fails + */ + if (doDbUpdate(conn) || doDbInsert(conn)) { + grant(); + return; + } + } + + removeFromMap(); + } + + /** + * Inserts the lock into the DB. + * + * @param conn DB connection + * @return {@code true} if a record was successfully inserted, {@code false} + * otherwise + * @throws SQLException if a DB error occurs + */ + protected boolean doDbInsert(Connection conn) throws SQLException { + logger.debug("insert lock record {}", this); + try (PreparedStatement stmt = + conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) " + + "values (?, ?, ?, timestampadd(second, ?, now()))")) { + + stmt.setString(1, getResourceId()); + stmt.setString(2, feature.hostName); + stmt.setString(3, feature.uuidString); + stmt.setInt(4, getHoldSec()); + + stmt.executeUpdate(); + + this.hostName = feature.hostName; + this.uuidString = feature.uuidString; + + return true; + } + } + + /** + * Updates the lock in the DB. + * + * @param conn DB connection + * @return {@code true} if a record was successfully updated, {@code false} + * otherwise + * @throws SQLException if a DB error occurs + */ + protected boolean doDbUpdate(Connection conn) throws SQLException { + logger.debug("update lock record {}", this); + try (PreparedStatement stmt = + conn.prepareStatement("UPDATE pooling.locks SET resourceId=?, host=?, owner=?," + + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?" + + " AND ((host=? AND owner=?) OR expirationTime < now())")) { + + stmt.setString(1, getResourceId()); + stmt.setString(2, feature.hostName); + stmt.setString(3, feature.uuidString); + stmt.setInt(4, getHoldSec()); + + stmt.setString(5, getResourceId()); + stmt.setString(6, this.hostName); + stmt.setString(7, this.uuidString); + + if (stmt.executeUpdate() != 1) { + return false; + } + + this.hostName = feature.hostName; + this.uuidString = feature.uuidString; + + return true; + } + } + + /** + * Deletes the lock from the DB. + * + * @param conn DB connection + * @throws SQLException if a DB error occurs + */ + protected void doDbDelete(Connection conn) throws SQLException { + logger.debug("delete lock record {}", this); + try (PreparedStatement stmt = + conn.prepareStatement("DELETE pooling.locks WHERE resourceId=? AND host=? AND owner=?")) { + + stmt.setString(1, getResourceId()); + stmt.setString(2, this.hostName); + stmt.setString(3, this.uuidString); + + stmt.executeUpdate(); + } + } + + /** + * Removes the lock from the map, and sends a notification using the current + * thread. + */ + private void removeFromMap() { + logger.debug("remove lock from map {}", this); + feature.resource2lock.remove(getResourceId(), this); + + synchronized (this) { + if (!isUnavailable()) { + deny(LOCK_LOST_MSG, true); + } + } + } + + @Override + public String toString() { + return "DistributedLock [state=" + getState() + ", resourceId=" + getResourceId() + ", ownerKey=" + + getOwnerKey() + ", holdSec=" + getHoldSec() + ", hostName=" + hostName + ", uuidString=" + + uuidString + "]"; + } + } + + @FunctionalInterface + private static interface RunnableWithEx { + void run() throws SQLException; + } + + // these may be overridden by junit tests + + protected Properties getProperties(String fileName) { + return SystemPersistenceConstants.getManager().getProperties(fileName); + } + + protected ScheduledExecutorService getThreadPool() { + return engine.getExecutorService(); + } + + protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec, + LockCallback callback) { + return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, this); + } +} diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManagerException.java index 55fc4fab..e720f9a1 100644 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManagerException.java @@ -2,14 +2,14 @@ * ============LICENSE_START======================================================= * feature-distributed-locking * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,15 +20,15 @@ package org.onap.policy.distributed.locking; -public class DistributedLockingFeatureException extends RuntimeException { +public class DistributedLockManagerException extends RuntimeException { private static final long serialVersionUID = 1L; /** * Constructor. - * + * * @param ex exception to be wrapped */ - public DistributedLockingFeatureException(Exception ex) { + public DistributedLockManagerException(Exception ex) { super(ex); } } diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java new file mode 100644 index 00000000..f470c8e2 --- /dev/null +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockProperties.java @@ -0,0 +1,136 @@ +/* + * ============LICENSE_START======================================================= + * feature-distributed-locking + * ================================================================================ + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.distributed.locking; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.common.utils.properties.BeanConfigurator; +import org.onap.policy.common.utils.properties.Property; +import org.onap.policy.common.utils.properties.exception.PropertyException; + + +@Getter +@Setter +public class DistributedLockProperties { + public static final String PREFIX = "distributed.locking."; + + public static final String DB_DRIVER = "javax.persistence.jdbc.driver"; + public static final String DB_URL = "javax.persistence.jdbc.url"; + public static final String DB_USER = "javax.persistence.jdbc.user"; + public static final String DB_PASS = "javax.persistence.jdbc.password"; + public static final String TRANSIENT_ERROR_CODES = PREFIX + "transient.error.codes"; + public static final String EXPIRE_CHECK_SEC = PREFIX + "expire.check.seconds"; + public static final String RETRY_SEC = PREFIX + "retry.seconds"; + public static final String MAX_RETRIES = PREFIX + "max.retries"; + + /** + * Database driver. + */ + @Property(name = DB_DRIVER) + private String dbDriver; + + /** + * Database url. + */ + @Property(name = DB_URL) + private String dbUrl; + + /** + * Database user. + */ + @Property(name = DB_USER) + private String dbUser; + + /** + * Database password. + */ + @Property(name = DB_PASS) + private String dbPwd; + + /** + * Vendor-specific error codes that are "transient", meaning they may go away if the + * command is repeated (e.g., connection issue), as opposed to something like a syntax + * error or a duplicate key. + */ + @Property(name = TRANSIENT_ERROR_CODES) + private String errorCodeStrings; + + private final Set<Integer> transientErrorCodes; + + /** + * Time, in seconds, to wait between checks for expired locks. + */ + @Property(name = EXPIRE_CHECK_SEC, defaultValue = "900") + private int expireCheckSec; + + /** + * Number of seconds to wait before retrying, after a DB error. + */ + @Property(name = RETRY_SEC, defaultValue = "60") + private int retrySec; + + /** + * Maximum number of times to retry a DB operation. + */ + @Property(name = MAX_RETRIES, defaultValue = "2") + private int maxRetries; + + /** + * Constructs the object, populating fields from the properties. + * + * @param props properties from which to configure this + * @throws PropertyException if an error occurs + */ + public DistributedLockProperties(Properties props) throws PropertyException { + new BeanConfigurator().configureFromProperties(this, props); + + Set<Integer> set = new HashSet<>(); + for (String text : errorCodeStrings.split(",")) { + text = text.trim(); + if (text.isEmpty()) { + continue; + } + + try { + set.add(Integer.valueOf(text)); + + } catch (NumberFormatException e) { + throw new PropertyException(TRANSIENT_ERROR_CODES, "errorCodeStrings", e); + } + } + + transientErrorCodes = Collections.unmodifiableSet(set); + } + + /** + * Determines if an error is transient. + * + * @param errorCode error code to check + * @return {@code true} if the error is transient, {@code false} otherwise + */ + public boolean isTransient(int errorCode) { + return transientErrorCodes.contains(errorCode); + } +} diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java deleted file mode 100644 index d5e07a30..00000000 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * feature-distributed-locking - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.distributed.locking; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.Properties; -import java.util.UUID; -import org.apache.commons.dbcp2.BasicDataSource; -import org.apache.commons.dbcp2.BasicDataSourceFactory; -import org.onap.policy.common.utils.properties.exception.PropertyException; -import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureApi; -import org.onap.policy.drools.features.PolicyEngineFeatureApi; -import org.onap.policy.drools.persistence.SystemPersistenceConstants; -import org.onap.policy.drools.system.PolicyEngine; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DistributedLockingFeature implements PolicyEngineFeatureApi, PolicyResourceLockFeatureApi { - - /** - * Logger instance. - */ - private static final Logger logger = LoggerFactory.getLogger(DistributedLockingFeature.class); - - /** - * Properties Configuration Name. - */ - public static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking"; - - /** - * Properties for locking feature. - */ - private DistributedLockingProperties lockProps; - - /** - * Data source used to connect to the DB containing locks. - */ - private BasicDataSource dataSource; - - /** - * UUID. - */ - private static final UUID uuid = UUID.randomUUID(); - - @Override - public int getSequenceNumber() { - return 1000; - } - - @Override - public OperResult beforeLock(String resourceId, String owner, int holdSec) { - - TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource); - - return (lock.lock(holdSec) ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED); - } - - @Override - public OperResult beforeRefresh(String resourceId, String owner, int holdSec) { - - TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource); - - return (lock.refresh(holdSec) ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED); - } - - @Override - public OperResult beforeUnlock(String resourceId, String owner) { - TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource); - - return (lock.unlock() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED); - } - - @Override - public OperResult beforeIsLockedBy(String resourceId, String owner) { - TargetLock lock = new TargetLock(resourceId, uuid, owner, dataSource); - - return (lock.isActive() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED); - } - - @Override - public OperResult beforeIsLocked(String resourceId) { - TargetLock lock = new TargetLock(resourceId, uuid, "dummyOwner", dataSource); - - return (lock.isLocked() ? OperResult.OPER_ACCEPTED : OperResult.OPER_DENIED); - } - - @Override - public boolean afterStart(PolicyEngine engine) { - - try { - this.lockProps = new DistributedLockingProperties(SystemPersistenceConstants.getManager() - .getProperties(DistributedLockingFeature.CONFIGURATION_PROPERTIES_NAME)); - this.dataSource = makeDataSource(); - } catch (PropertyException e) { - logger.error("DistributedLockingFeature feature properies have not been loaded", e); - throw new DistributedLockingFeatureException(e); - } catch (InterruptedException e) { - logger.error("DistributedLockingFeature failed to create data source", e); - Thread.currentThread().interrupt(); - throw new DistributedLockingFeatureException(e); - } catch (Exception e) { - logger.error("DistributedLockingFeature failed to create data source", e); - throw new DistributedLockingFeatureException(e); - } - - cleanLockTable(); - - return false; - } - - /** - * Make data source. - * - * @return a new, pooled data source - * @throws Exception exception - */ - protected BasicDataSource makeDataSource() throws Exception { - Properties props = new Properties(); - props.put("driverClassName", lockProps.getDbDriver()); - props.put("url", lockProps.getDbUrl()); - props.put("username", lockProps.getDbUser()); - props.put("password", lockProps.getDbPwd()); - props.put("testOnBorrow", "true"); - props.put("poolPreparedStatements", "true"); - - // additional properties are listed in the GenericObjectPool API - - return BasicDataSourceFactory.createDataSource(props); - } - - /** - * This method kills the heartbeat thread and calls refreshLockTable which removes - * any records from the db where the current host is the owner. - */ - @Override - public boolean beforeShutdown(PolicyEngine engine) { - cleanLockTable(); - return false; - } - - /** - * This method removes all records owned by the current host from the db. - */ - private void cleanLockTable() { - - try (Connection conn = dataSource.getConnection(); - PreparedStatement statement = conn.prepareStatement( - "DELETE FROM pooling.locks WHERE host = ? OR expirationTime < now()") - ) { - - statement.setString(1, uuid.toString()); - statement.executeUpdate(); - - } catch (SQLException e) { - logger.error("error in refreshLockTable()", e); - } - - } -} diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java deleted file mode 100644 index 0ed5930d..00000000 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * feature-distributed-locking - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.distributed.locking; - -import java.util.Properties; -import org.onap.policy.common.utils.properties.BeanConfigurator; -import org.onap.policy.common.utils.properties.Property; -import org.onap.policy.common.utils.properties.exception.PropertyException; - - -public class DistributedLockingProperties { - - /** - * Feature properties all begin with this prefix. - */ - public static final String PREFIX = "distributed.locking."; - - public static final String DB_DRIVER = "javax.persistence.jdbc.driver"; - public static final String DB_URL = "javax.persistence.jdbc.url"; - public static final String DB_USER = "javax.persistence.jdbc.user"; - public static final String DB_PWD = "javax.persistence.jdbc.password"; - - /** - * Properties from which this was constructed. - */ - private final Properties source; - - /** - * Database driver. - */ - @Property(name = DB_DRIVER) - private String dbDriver; - - /** - * Database url. - */ - @Property(name = DB_URL) - private String dbUrl; - - /** - * Database user. - */ - @Property(name = DB_USER) - private String dbUser; - - /** - * Database password. - */ - @Property(name = DB_PWD) - private String dbPwd; - - /** - * Constructs the object, populating fields from the properties. - * - * @param props properties from which to configure this - * @throws PropertyException if an error occurs - */ - public DistributedLockingProperties(Properties props) throws PropertyException { - source = props; - - new BeanConfigurator().configureFromProperties(this, props); - } - - - public Properties getSource() { - return source; - } - - - public String getDbDriver() { - return dbDriver; - } - - - public String getDbUrl() { - return dbUrl; - } - - - public String getDbUser() { - return dbUser; - } - - - public String getDbPwd() { - return dbPwd; - } - - - public void setDbDriver(String dbDriver) { - this.dbDriver = dbDriver; - } - - - public void setDbUrl(String dbUrl) { - this.dbUrl = dbUrl; - } - - - public void setDbUser(String dbUser) { - this.dbUser = dbUser; - } - - - public void setDbPwd(String dbPwd) { - this.dbPwd = dbPwd; - } - -} diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java deleted file mode 100644 index 42e1f92f..00000000 --- a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java +++ /dev/null @@ -1,282 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * feature-distributed-locking - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.distributed.locking; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.UUID; -import org.apache.commons.dbcp2.BasicDataSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TargetLock { - - private static final Logger logger = LoggerFactory.getLogger(TargetLock.class); - - /** - * The Target resource we want to lock. - */ - private String resourceId; - - /** - * Data source used to connect to the DB containing locks. - */ - private BasicDataSource dataSource; - - /** - * UUID . - */ - private UUID uuid; - - /** - * Owner. - */ - private String owner; - - /** - * Constructs a TargetLock object. - * - * @param resourceId ID of the entity we want to lock - * @param dataSource used to connect to the DB containing locks - */ - public TargetLock(String resourceId, UUID uuid, String owner, BasicDataSource dataSource) { - this.resourceId = resourceId; - this.uuid = uuid; - this.owner = owner; - this.dataSource = dataSource; - } - - /** - * Obtain a lock. - * @param holdSec the amount of time, in seconds, that the lock should be held - */ - public boolean lock(int holdSec) { - - return grabLock(holdSec); - } - - /** - * Refresh a lock. - * - * @param holdSec the amount of time, in seconds, that the lock should be held - * @return {@code true} if the lock was refreshed, {@code false} if the resource is - * not currently locked by the given owner - */ - public boolean refresh(int holdSec) { - return updateLock(holdSec); - } - - /** - * Unlock a resource by deleting it's associated record in the db. - */ - public boolean unlock() { - return deleteLock(); - } - - /** - * "Grabs" lock by attempting to insert a new record in the db. - * If the insert fails due to duplicate key error resource is already locked - * so we call secondGrab. - * @param holdSec the amount of time, in seconds, that the lock should be held - */ - private boolean grabLock(int holdSec) { - - // try to insert a record into the table(thereby grabbing the lock) - try (Connection conn = dataSource.getConnection(); - - PreparedStatement statement = conn.prepareStatement( - "INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) " - + "values (?, ?, ?, timestampadd(second, ?, now()))")) { - - int index = 1; - statement.setString(index++, this.resourceId); - statement.setString(index++, this.uuid.toString()); - statement.setString(index++, this.owner); - statement.setInt(index++, holdSec); - statement.executeUpdate(); - } - - catch (SQLException e) { - logger.error("error in TargetLock.grabLock()", e); - return secondGrab(holdSec); - } - - return true; - } - - /** - * A second attempt at grabbing a lock. It first attempts to update the lock in case it is expired. - * If that fails, it attempts to insert a new record again - * @param holdSec the amount of time, in seconds, that the lock should be held - */ - private boolean secondGrab(int holdSec) { - - try (Connection conn = dataSource.getConnection(); - - PreparedStatement updateStatement = conn.prepareStatement( - "UPDATE pooling.locks SET host = ?, owner = ?, " - + "expirationTime = timestampadd(second, ?, now()) " - + "WHERE resourceId = ? AND expirationTime < now()"); - - PreparedStatement insertStatement = conn.prepareStatement( - "INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) " - + "values (?, ?, ?, timestampadd(second, ?, now()))");) { - - int index = 1; - updateStatement.setString(index++, this.uuid.toString()); - updateStatement.setString(index++, this.owner); - updateStatement.setInt(index++, holdSec); - updateStatement.setString(index++, this.resourceId); - - // The lock was expired and we grabbed it. - // return true - if (updateStatement.executeUpdate() == 1) { - return true; - } - - // If our update does not return 1 row, the lock either has not expired - // or it was removed. Try one last grab - else { - index = 1; - insertStatement.setString(index++, this.resourceId); - insertStatement.setString(index++, this.uuid.toString()); - insertStatement.setString(index++, this.owner); - insertStatement.setInt(index++, holdSec); - - // If our insert returns 1 we successfully grabbed the lock - return (insertStatement.executeUpdate() == 1); - } - - } catch (SQLException e) { - logger.error("error in TargetLock.secondGrab()", e); - return false; - } - - } - - /** - * Updates the DB record associated with the lock. - * - * @param holdSec the amount of time, in seconds, that the lock should be held - * @return {@code true} if the record was updated, {@code false} otherwise - */ - private boolean updateLock(int holdSec) { - - try (Connection conn = dataSource.getConnection(); - - PreparedStatement updateStatement = conn.prepareStatement( - "UPDATE pooling.locks SET host = ?, owner = ?, " - + "expirationTime = timestampadd(second, ?, now()) " - + "WHERE resourceId = ? AND owner = ? AND expirationTime >= now()")) { - - int index = 1; - updateStatement.setString(index++, this.uuid.toString()); - updateStatement.setString(index++, this.owner); - updateStatement.setInt(index++, holdSec); - updateStatement.setString(index++, this.resourceId); - updateStatement.setString(index++, this.owner); - - // refresh succeeded iff a record was updated - return (updateStatement.executeUpdate() == 1); - - } catch (SQLException e) { - logger.error("error in TargetLock.refreshLock()", e); - return false; - } - - } - - /** - *To remove a lock we simply delete the record from the db . - */ - private boolean deleteLock() { - - try (Connection conn = dataSource.getConnection(); - - PreparedStatement deleteStatement = conn.prepareStatement( - "DELETE FROM pooling.locks WHERE resourceId = ? AND owner = ? AND host = ?")) { - - deleteStatement.setString(1, this.resourceId); - deleteStatement.setString(2, this.owner); - deleteStatement.setString(3, this.uuid.toString()); - - return (deleteStatement.executeUpdate() == 1); - - } catch (SQLException e) { - logger.error("error in TargetLock.deleteLock()", e); - return false; - } - - } - - /** - * Is the lock active. - */ - public boolean isActive() { - try (Connection conn = dataSource.getConnection(); - - PreparedStatement selectStatement = conn.prepareStatement( - "SELECT * FROM pooling.locks " - + "WHERE resourceId = ? AND host = ? AND owner= ? AND expirationTime >= now()")) { - - selectStatement.setString(1, this.resourceId); - selectStatement.setString(2, this.uuid.toString()); - selectStatement.setString(3, this.owner); - try (ResultSet result = selectStatement.executeQuery()) { - - // This will return true if the - // query returned at least one row - return result.first(); - } - - } - - catch (SQLException e) { - logger.error("error in TargetLock.isActive()", e); - return false; - } - - } - - /** - * Is the resource locked. - */ - public boolean isLocked() { - - try (Connection conn = dataSource.getConnection(); - PreparedStatement selectStatement = conn - .prepareStatement("SELECT * FROM pooling.locks WHERE resourceId = ? AND expirationTime >= now()")) { - - selectStatement.setString(1, this.resourceId); - try (ResultSet result = selectStatement.executeQuery()) { - // This will return true if the - // query returned at least one row - return result.first(); - } - } catch (SQLException e) { - logger.error("error in TargetLock.isActive()", e); - return false; - } - } - -} |