From 38c3dd74dc9c43a6606680cc6df82062cd7cbd81 Mon Sep 17 00:00:00 2001 From: "Stan Bonev (sb5356)" Date: Fri, 12 Mar 2021 11:30:16 -0500 Subject: rlock: add JavaDoc Issue-ID: CCSDK-3208 Signed-off-by: Stan Bonev (sb5356) Change-Id: I89f79c96c832913c4c5423a1a8e6d41743f2b61c --- .../onap/ccsdk/features/lib/rlock/LockHelper.java | 116 ++++++- .../ccsdk/features/lib/rlock/LockHelperImpl.java | 348 ++++++++++++--------- .../ccsdk/features/lib/rlock/ResourceLock.java | 12 +- .../ccsdk/features/lib/rlock/ResourceLockDao.java | 220 ++++++------- .../lib/rlock/ResourceLockedException.java | 37 ++- .../features/lib/rlock/SynchronizedFunction.java | 113 +++++-- .../ccsdk/features/lib/rlock/TestLockHelper.java | 44 +-- .../ccsdk/features/lib/rlock/testutils/DbUtil.java | 150 +++++---- .../features/lib/rlock/testutils/FileUtil.java | 30 +- 9 files changed, 650 insertions(+), 420 deletions(-) diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java index 17817fe1d..88d016606 100644 --- a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java +++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java @@ -2,13 +2,121 @@ package org.onap.ccsdk.features.lib.rlock; import java.util.Collection; +/** + *

+ * A cooperative locking service. This service can be used to synchronize access to shared + * resources, so only one thread has access to it at any time. It is cooperative locking, + * meaning the threads have to agree to explicitly lock the resource, whenever they need to access + * it and then explicitly unlock it, when they are done with the resource. While a resource is + * locked by a thread, no other thread can lock it, until it is unlocked. + *

+ *

+ * The term resource represents anything that needs a synchronized access, for + * example, message queue or connection pool or limited bandwidth capacity. + *

+ *

+ * The implementation here provides a distributed locking functionality, which means threads + * can reside in the same application instance or different application instances, on same or + * different machines (see {@link LockHelperImpl}). + *

+ *

+ * Example: Bandwidth check: + *

+ *
+ * boolean success = false;
+ * String resourceName = "Port-123-Bandwidth";
+ * int neededBandwidth = 100; // 100 Mbps
+ * int bandwidthLimit = 1000;
+ * int lockRequester = "this-thread-id-" + (int) (Math.random() * 1000000);
+ * int lockTimeout = 30; // 30 sec - the longest time, we expect to complete the check
+ *
+ * try {
+ *     lockHelper.lock(resourceName, lockRequester, lockTimeout);
+ *
+ *     int usedBandwidth = portBandwidthDao.readUsedBandwidth("Port-123");
+ *     if (usedBandwidth + neededBandwidth <= bandwidthLimit) {
+ *         portBandwidthDao.updateUsedBandwidth("Port-123", usedBandwidth + neededBandwidth);
+ *         success = true;
+ *     }
+ * } finally {
+ *     lockHelper.unlock(resourceName, true);
+ * }
+ * 
+ * + * @see SynchronizedFunction + * @see LockHelperImpl + * @see ResourceLockedException + */ public interface LockHelper { - void lock(String resourceName, String lockRequester, int lockTimeout /* Seconds */); + /** + *

+ * Locks the specified resource. Lock requester identifies the thread that is requesting the lock. + * If the resource is already locked by another lock requester (another thread), then the thread is + * blocked until the resource is available. If the resource is available or locked by the same + * thread, the lock succeeds.
+ * Usually lock requester can be generated using the thread name and some random number. + *

+ *

+ * Lock timeout specifies how long (in seconds) to keep the lock in case the thread misses to unlock + * the resource. The lock will expire after this time in case the resource is never unlocked. This + * time should be set to the maximum time expected for the processing of the resource, so the lock + * does not expire until the thread is done with the resource. The lock timeout is supposed to avoid + * permanently locking a resource in case of application crash in the middle of processing. + *

+ * + * @param resourceName Identifies the resource to be locked + * @param lockRequester Identifies the thread requesting the lock + * @param lockTimeout The expiration timeout of the lock (in seconds) + * @throws ResourceLockedException if the resource cannot be locked + */ + void lock(String resourceName, String lockRequester, int lockTimeout /* Seconds */); - void unlock(String resourceName, boolean force); + /** + *

+ * Unlocks the specified resource. This method should always succeed including in the case, when the + * resource is already unlocked. + *

+ *

+ * Force parameter can be used in case the same thread might lock the same resource multiple times. + * In case resource is locked multiple times, normally, in case force parameter is false, the thread + * will need to unlock the resource the same number of times for the resource to become available + * again. If the force parameter is true, then the resource will be unlocked immediately, no matter + * how many times it has been locked. + *

+ * + * @param resourceName Identifies the resource to be unlocked + * @param force If true, forces resource to be unlocked immediately, even if it has been locked more + * than once + */ + void unlock(String resourceName, boolean force); - void lock(Collection resourceNameList, String lockRequester, int lockTimeout /* Seconds */); + /** + *

+ * Locks multiple resources at once. It ensures that either all resources are successfully locked or + * none is. + *

+ * + * @param resourceNameList The set of resources to lock + * @param lockRequester Identifies the thread requesting the lock + * @param lockTimeout The expiration timeout of the lock (in seconds) + * @throws ResourceLockedException if any of the resources cannot be locked + * + * @see LockHelper#lock(String, String, int) + */ + void lock(Collection resourceNameList, String lockRequester, int lockTimeout /* Seconds */); - void unlock(Collection resourceNameList, boolean force); + /** + *

+ * Unlocks the specified set of resources. This method should always succeed including in the case, + * when any of the resources are already unlocked. + *

+ * + * @param resourceNameList The set of resources to unlock + * @param force If true, forces all resources to be unlocked immediately, even if any of them have + * been locked more than once + * + * @see LockHelper#unlock(String, boolean) + */ + void unlock(Collection resourceNameList, boolean force); } diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java index 666fb6af5..63fe111df 100644 --- a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java +++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java @@ -5,169 +5,211 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.List; - import javax.sql.DataSource; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + *

+ * Implementation of the locking service, providing distributed locking functionality. It is + * done using a table in SQL Database as a single synchronization point. Hence, for this + * implementation, it is required that all participating threads in all participating applications + * access the same database instance (or a distributed database that looks like one database + * instance). + *

+ *

+ * The following table is required in the database: + *

+ * + *
+ * CREATE TABLE IF NOT EXISTS `resource_lock` (
+ *   `resource_lock_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+ *   `resource_name` varchar(256),
+ *   `lock_holder` varchar(100) NOT NULL,
+ *   `lock_count` smallint(6) NOT NULL,
+ *   `lock_time` datetime NOT NULL,
+ *   `expiration_time` datetime NOT NULL,
+ *   PRIMARY KEY (`resource_lock_id`),
+ *   UNIQUE KEY `IX1_RESOURCE_LOCK` (`resource_name`)
+ * );
+ * 
+ *

+ * The implementation tries to insert records in the table for all the requested resources. If there + * are already records for any of the resources, it fails and then makes several more tries before + * giving up and throwing {@link ResourceLockedException}. + *

+ *

+ * The class has 2 configurable parameters: + *

    + *
  • retryCount: the numbers of retries, when locking a resource, default 20
  • + *
  • lockWait: the time between each retry (in seconds), default 5 seconds
  • + *
+ * The total time before locking fails would be retryCount * lockWait seconds. + *

+ * + * @see LockHelper + * @see SynchronizedFunction + * @see ResourceLockedException + */ public class LockHelperImpl implements LockHelper { - private static final Logger log = LoggerFactory.getLogger(LockHelperImpl.class); + private static final Logger log = LoggerFactory.getLogger(LockHelperImpl.class); - private int retryCount = 20; - private int lockWait = 5; // Seconds + private int retryCount = 20; + private int lockWait = 5; // Seconds - private DataSource dataSource; + private DataSource dataSource; - @Override - public void lock(String resourceName, String lockRequester, int lockTimeout /* Seconds */) { - lock(Collections.singleton(resourceName), lockRequester, lockTimeout); - } + @Override + public void lock(String resourceName, String lockRequester, int lockTimeout /* Seconds */) { + lock(Collections.singleton(resourceName), lockRequester, lockTimeout); + } - @Override - public void unlock(String resourceName, boolean force) { - unlock(Collections.singleton(resourceName), force); - } + @Override + public void unlock(String resourceName, boolean force) { + unlock(Collections.singleton(resourceName), force); + } - @Override - public void lock(Collection resourceNameList, String lockRequester, int lockTimeout /* Seconds */) { - for (int i = 0; true; i++) { - try { - tryLock(resourceNameList, lockRequester, lockTimeout); + @Override + public void lock(Collection resourceNameList, String lockRequester, int lockTimeout /* Seconds */) { + for (int i = 0; true; i++) { + try { + tryLock(resourceNameList, lockRequester, lockTimeout); log.info("Resources locked: " + resourceNameList); - return; - } catch (ResourceLockedException e) { - if (i > retryCount) { - throw e; - } - try { - Thread.sleep(lockWait * 1000L); - } catch (InterruptedException ex) { - } - } - } - } - - @Override - public void unlock(Collection lockNames, boolean force) { - if (lockNames == null || lockNames.size() == 0) { - return; - } - - try (ResourceLockDao resourceLockDao = new ResourceLockDao(dataSource)) { - try { - for (String name : lockNames) { - ResourceLock l = resourceLockDao.getByResourceName(name); - if (l != null) { - if (force || l.lockCount == 1) { - resourceLockDao.delete(l.id); - } else { - resourceLockDao.decrementLockCount(l.id); - } - } - } - resourceLockDao.commit(); - log.info("Resources unlocked: " + lockNames); - } catch (Exception e) { - resourceLockDao.rollback(); - } - } - } - - public void tryLock(Collection resourceNameList, String lockRequester, int lockTimeout /* Seconds */) { - if (resourceNameList == null || resourceNameList.isEmpty()) { - return; - } - - lockRequester = generateLockRequester(lockRequester, 100); - - // First check if all requested records are available to lock - - Date now = new Date(); - - try (ResourceLockDao resourceLockDao = new ResourceLockDao(dataSource)) { - try { - List dbLockList = new ArrayList<>(); - List insertLockNameList = new ArrayList<>(); - for (String name : resourceNameList) { - ResourceLock l = resourceLockDao.getByResourceName(name); - - boolean canLock = l == null || now.getTime() > l.expirationTime.getTime() || lockRequester != null && lockRequester.equals(l.lockHolder) || l.lockCount <= 0; - if (!canLock) { - throw new ResourceLockedException(l.resourceName, l.lockHolder, lockRequester); - } - - if (l != null) { - if (now.getTime() > l.expirationTime.getTime() || l.lockCount <= 0) { - l.lockCount = 0; - } - dbLockList.add(l); - } else { - insertLockNameList.add(name); - } - } - - // Update the lock info in DB - for (ResourceLock l : dbLockList) { - resourceLockDao.update(l.id, lockRequester, now, new Date(now.getTime() + lockTimeout * 1000), l.lockCount + 1); - } - - // Insert records for those that are not yet there - for (String lockName : insertLockNameList) { - ResourceLock l = new ResourceLock(); - l.resourceName = lockName; - l.lockHolder = lockRequester; - l.lockTime = now; - l.expirationTime = new Date(now.getTime() + lockTimeout * 1000); - l.lockCount = 1; - - try { - resourceLockDao.add(l); - } catch (Exception e) { - throw new ResourceLockedException(l.resourceName, "unknown", lockRequester); - } - } - - resourceLockDao.commit(); - - } catch (Exception e) { - resourceLockDao.rollback(); - throw e; - } - } - } - - private static String generateLockRequester(String name, int maxLength) { - if (name == null) { - name = ""; - } - int l1 = name.length(); - String tname = Thread.currentThread().getName(); - int l2 = tname.length(); - if (l1 + l2 + 1 > maxLength) { - int maxl1 = maxLength / 2; - if (l1 > maxl1) { - name = name.substring(0, maxl1); - l1 = maxl1; - } - int maxl2 = maxLength - l1 - 1; - if (l2 > maxl2) { - tname = tname.substring(0, 6) + "..." + tname.substring(l2 - maxl2 + 9); - } - } - return tname + '-' + name; - } - - public void setRetryCount(int retryCount) { - this.retryCount = retryCount; - } - - public void setLockWait(int lockWait) { - this.lockWait = lockWait; - } - - public void setDataSource(DataSource dataSource) { - this.dataSource = dataSource; - } + return; + } catch (ResourceLockedException e) { + if (i > retryCount) { + throw e; + } + try { + Thread.sleep(lockWait * 1000L); + } catch (InterruptedException ex) { + } + } + } + } + + @Override + public void unlock(Collection lockNames, boolean force) { + if (lockNames == null || lockNames.size() == 0) { + return; + } + + try (ResourceLockDao resourceLockDao = new ResourceLockDao(dataSource)) { + try { + for (String name : lockNames) { + ResourceLock l = resourceLockDao.getByResourceName(name); + if (l != null) { + if (force || l.lockCount == 1) { + resourceLockDao.delete(l.id); + } else { + resourceLockDao.decrementLockCount(l.id); + } + } + } + resourceLockDao.commit(); + log.info("Resources unlocked: " + lockNames); + } catch (Exception e) { + resourceLockDao.rollback(); + } + } + } + + public void tryLock(Collection resourceNameList, String lockRequester, int lockTimeout /* Seconds */) { + if (resourceNameList == null || resourceNameList.isEmpty()) { + return; + } + + lockRequester = generateLockRequester(lockRequester, 100); + + // First check if all requested records are available to lock + + Date now = new Date(); + + try (ResourceLockDao resourceLockDao = new ResourceLockDao(dataSource)) { + try { + List dbLockList = new ArrayList<>(); + List insertLockNameList = new ArrayList<>(); + for (String name : resourceNameList) { + ResourceLock l = resourceLockDao.getByResourceName(name); + + boolean canLock = l == null || now.getTime() > l.expirationTime.getTime() + || lockRequester != null && lockRequester.equals(l.lockHolder) || l.lockCount <= 0; + if (!canLock) { + throw new ResourceLockedException(l.resourceName, l.lockHolder, lockRequester); + } + + if (l != null) { + if (now.getTime() > l.expirationTime.getTime() || l.lockCount <= 0) { + l.lockCount = 0; + } + dbLockList.add(l); + } else { + insertLockNameList.add(name); + } + } + + // Update the lock info in DB + for (ResourceLock l : dbLockList) { + resourceLockDao.update(l.id, lockRequester, now, new Date(now.getTime() + lockTimeout * 1000), + l.lockCount + 1); + } + + // Insert records for those that are not yet there + for (String lockName : insertLockNameList) { + ResourceLock l = new ResourceLock(); + l.resourceName = lockName; + l.lockHolder = lockRequester; + l.lockTime = now; + l.expirationTime = new Date(now.getTime() + lockTimeout * 1000); + l.lockCount = 1; + + try { + resourceLockDao.add(l); + } catch (Exception e) { + throw new ResourceLockedException(l.resourceName, "unknown", lockRequester); + } + } + + resourceLockDao.commit(); + + } catch (Exception e) { + resourceLockDao.rollback(); + throw e; + } + } + } + + private static String generateLockRequester(String name, int maxLength) { + if (name == null) { + name = ""; + } + int l1 = name.length(); + String tname = Thread.currentThread().getName(); + int l2 = tname.length(); + if (l1 + l2 + 1 > maxLength) { + int maxl1 = maxLength / 2; + if (l1 > maxl1) { + name = name.substring(0, maxl1); + l1 = maxl1; + } + int maxl2 = maxLength - l1 - 1; + if (l2 > maxl2) { + tname = tname.substring(0, 6) + "..." + tname.substring(l2 - maxl2 + 9); + } + } + return tname + '-' + name; + } + + public void setRetryCount(int retryCount) { + this.retryCount = retryCount; + } + + public void setLockWait(int lockWait) { + this.lockWait = lockWait; + } + + public void setDataSource(DataSource dataSource) { + this.dataSource = dataSource; + } } diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java index a7e966855..a51523859 100644 --- a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java +++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java @@ -4,10 +4,10 @@ import java.util.Date; public class ResourceLock { - public long id; - public String resourceName; - public String lockHolder; - public int lockCount; - public Date lockTime; - public Date expirationTime; + public long id; + public String resourceName; + public String lockHolder; + public int lockCount; + public Date lockTime; + public Date expirationTime; } diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java index 4833bb28e..23328577a 100644 --- a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java +++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java @@ -6,117 +6,119 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; import java.util.Date; - import javax.sql.DataSource; public class ResourceLockDao implements AutoCloseable { - private Connection con; - - public ResourceLockDao(DataSource dataSource) { - try { - con = dataSource.getConnection(); - con.setAutoCommit(false); - } catch (SQLException e) { - throw new RuntimeException("Error getting DB connection: " + e.getMessage(), e); - } - } - - public void add(ResourceLock l) { - String sql = "INSERT INTO RESOURCE_LOCK (resource_name, lock_holder, lock_count, lock_time, expiration_time)\n" + "VALUES (?, ?, ?, ?, ?)"; - - try (PreparedStatement ps = con.prepareStatement(sql)) { - ps.setString(1, l.resourceName); - ps.setString(2, l.lockHolder); - ps.setInt(3, l.lockCount); - ps.setTimestamp(4, new Timestamp(l.lockTime.getTime())); - ps.setTimestamp(5, new Timestamp(l.expirationTime.getTime())); - ps.execute(); - } catch (SQLException e) { - throw new RuntimeException("Error adding lock to DB: " + e.getMessage(), e); - } - } - - public void update(long id, String lockHolder, Date lockTime, Date expirationTime, int lockCount) { - String sql = "UPDATE RESOURCE_LOCK SET lock_holder = ?, lock_time = ?, expiration_time = ?, lock_count = ? WHERE resource_lock_id = ?"; - - try (PreparedStatement ps = con.prepareStatement(sql)) { - ps.setString(1, lockHolder); - ps.setTimestamp(2, new Timestamp(lockTime.getTime())); - ps.setTimestamp(3, new Timestamp(expirationTime.getTime())); - ps.setInt(4, lockCount); - ps.setLong(5, id); - ps.execute(); - } catch (SQLException e) { - throw new RuntimeException("Error updating lock in DB: " + e.getMessage(), e); - } - } - - public ResourceLock getByResourceName(String resourceName) { - String sql = "SELECT * FROM RESOURCE_LOCK WHERE resource_name = ?"; - - try (PreparedStatement ps = con.prepareStatement(sql)) { - ps.setString(1, resourceName); - try (ResultSet rs = ps.executeQuery()) { - if (rs.next()) { - ResourceLock rl = new ResourceLock(); - rl.id = rs.getLong("resource_lock_id"); - rl.resourceName = rs.getString("resource_name"); - rl.lockHolder = rs.getString("lock_holder"); - rl.lockCount = rs.getInt("lock_count"); - rl.lockTime = rs.getTimestamp("lock_time"); - rl.expirationTime = rs.getTimestamp("expiration_time"); - return rl; - } - return null; - } - } catch (SQLException e) { - throw new RuntimeException("Error reading lock from DB: " + e.getMessage(), e); - } - } - - public void delete(long id) { - String sql = "DELETE FROM RESOURCE_LOCK WHERE resource_lock_id = ?"; - - try (PreparedStatement ps = con.prepareStatement(sql)) { - ps.setLong(1, id); - ps.execute(); - } catch (SQLException e) { - throw new RuntimeException("Error deleting lock from DB: " + e.getMessage(), e); - } - } - - public void decrementLockCount(long id) { - String sql = "UPDATE RESOURCE_LOCK SET lock_count = lock_count - 1 WHERE resource_lock_id = ?"; - - try (PreparedStatement ps = con.prepareStatement(sql)) { - ps.setLong(1, id); - ps.execute(); - } catch (SQLException e) { - throw new RuntimeException("Error updating lock count in DB: " + e.getMessage(), e); - } - } - - public void commit() { - try { - con.commit(); - } catch (SQLException e) { - throw new RuntimeException("Error committing DB connection: " + e.getMessage(), e); - } - } - - public void rollback() { - try { - con.rollback(); - } catch (SQLException e) { - } - } - - @Override - public void close() { - try { - con.close(); - } catch (SQLException e) { - } - } + private Connection con; + + public ResourceLockDao(DataSource dataSource) { + try { + con = dataSource.getConnection(); + con.setAutoCommit(false); + } catch (SQLException e) { + throw new RuntimeException("Error getting DB connection: " + e.getMessage(), e); + } + } + + public void add(ResourceLock l) { + String sql = "INSERT INTO RESOURCE_LOCK (resource_name, lock_holder, lock_count, lock_time, expiration_time)\n" + + "VALUES (?, ?, ?, ?, ?)"; + + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setString(1, l.resourceName); + ps.setString(2, l.lockHolder); + ps.setInt(3, l.lockCount); + ps.setTimestamp(4, new Timestamp(l.lockTime.getTime())); + ps.setTimestamp(5, new Timestamp(l.expirationTime.getTime())); + ps.execute(); + } catch (SQLException e) { + throw new RuntimeException("Error adding lock to DB: " + e.getMessage(), e); + } + } + + public void update(long id, String lockHolder, Date lockTime, Date expirationTime, int lockCount) { + String sql = + "UPDATE RESOURCE_LOCK SET lock_holder = ?, lock_time = ?, expiration_time = ?, lock_count = ?\n" + + "WHERE resource_lock_id = ?"; + + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setString(1, lockHolder); + ps.setTimestamp(2, new Timestamp(lockTime.getTime())); + ps.setTimestamp(3, new Timestamp(expirationTime.getTime())); + ps.setInt(4, lockCount); + ps.setLong(5, id); + ps.execute(); + } catch (SQLException e) { + throw new RuntimeException("Error updating lock in DB: " + e.getMessage(), e); + } + } + + public ResourceLock getByResourceName(String resourceName) { + String sql = "SELECT * FROM RESOURCE_LOCK WHERE resource_name = ?"; + + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setString(1, resourceName); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + ResourceLock rl = new ResourceLock(); + rl.id = rs.getLong("resource_lock_id"); + rl.resourceName = rs.getString("resource_name"); + rl.lockHolder = rs.getString("lock_holder"); + rl.lockCount = rs.getInt("lock_count"); + rl.lockTime = rs.getTimestamp("lock_time"); + rl.expirationTime = rs.getTimestamp("expiration_time"); + return rl; + } + return null; + } + } catch (SQLException e) { + throw new RuntimeException("Error reading lock from DB: " + e.getMessage(), e); + } + } + + public void delete(long id) { + String sql = "DELETE FROM RESOURCE_LOCK WHERE resource_lock_id = ?"; + + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setLong(1, id); + ps.execute(); + } catch (SQLException e) { + throw new RuntimeException("Error deleting lock from DB: " + e.getMessage(), e); + } + } + + public void decrementLockCount(long id) { + String sql = "UPDATE RESOURCE_LOCK SET lock_count = lock_count - 1 WHERE resource_lock_id = ?"; + + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setLong(1, id); + ps.execute(); + } catch (SQLException e) { + throw new RuntimeException("Error updating lock count in DB: " + e.getMessage(), e); + } + } + + public void commit() { + try { + con.commit(); + } catch (SQLException e) { + throw new RuntimeException("Error committing DB connection: " + e.getMessage(), e); + } + } + + public void rollback() { + try { + con.rollback(); + } catch (SQLException e) { + } + } + + @Override + public void close() { + try { + con.close(); + } catch (SQLException e) { + } + } } diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java index 7c8cfa122..c79c00f57 100644 --- a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java +++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java @@ -1,20 +1,33 @@ package org.onap.ccsdk.features.lib.rlock; +/** + *

+ * This exception is thrown by {@link LockHelper#lock(String, String, int) LockHelper.lock} methods, + * if a requested resource cannot be locked. This might happen, because another threads keeps a + * resource locked for a long time or there is a stale lock that hasn't expired yet. + *

+ *

+ * The exception message will contain the locked resource and what lock requester (thread) holds the + * resource locked. + *

+ * + * @see LockHelperImpl + */ public class ResourceLockedException extends RuntimeException { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 1L; - private String lockName, lockHolder, lockRequester; + private String lockName, lockHolder, lockRequester; - public ResourceLockedException(String lockName, String lockHolder, String lockRequester) { - this.lockName = lockName; - this.lockHolder = lockHolder; - this.lockRequester = lockRequester; - } + public ResourceLockedException(String lockName, String lockHolder, String lockRequester) { + this.lockName = lockName; + this.lockHolder = lockHolder; + this.lockRequester = lockRequester; + } - @Override - public String getMessage() { - return "Failed to lock [" + lockName + "] for [" + lockRequester + "]. Currently locked by [" + lockHolder + - "]."; - } + @Override + public String getMessage() { + return "Failed to lock [" + lockName + "] for [" + lockRequester + "]. Currently locked by [" + lockHolder + + "]."; + } } diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java index ff25e16f0..e92700055 100644 --- a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java +++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java @@ -4,32 +4,101 @@ import java.util.Collection; import java.util.HashSet; import java.util.Set; +/** + *

+ * A simple abstract base class, providing functionality similar to the synchronized block + * in Java. Derived class provides the set of resources that need synchronized access and the + * processing method that is executed while the set of resources is locked (override _exec + * method). This class uses the {@link LockHelper} service to lock the resources, execute the + * processing method and then unlock the resources. + *

+ *

+ * Example: + *

+ * + *
+ * public class BandwidthCheckFunction extends SynchronizedFunction {
+ *
+ *     private PortBandwidthDao portBandwidthDao;
+ *     private int neededBandwidth;
+ *     private int bandwidthLimit;
+ *
+ *     private boolean successful; // Output
+ *
+ *     public BandwidthCheckFunction(LockHelper lockHelper, PortBandwidthDao portBandwidthDao, String portId,
+ *             int neededBandwidth, int bandwidthLimit) {
+ *         super(lockHelper, Collections.singleton(portId + "-Bandwidth"), 60); // 60 sec lockTimeout
+ *         this.portBandwidthDao = portBandwidthDao;
+ *         this.neededBandwidth = neededBandwidth;
+ *         this.bandwidthLimit = bandwidthLimit;
+ *     }
+ *
+ *     {@literal @}Override
+ *     protected void _exec() {
+ *         int usedBandwidth = portBandwidthDao.readUsedBandwidth("Port-123");
+ *         if (usedBandwidth + neededBandwidth <= bandwidthLimit) {
+ *             portBandwidthDao.updateUsedBandwidth("Port-123", usedBandwidth + neededBandwidth);
+ *             successful = true;
+ *         } else {
+ *             successful = false;
+ *         }
+ *     }
+ *
+ *     public boolean isSuccessful() {
+ *         return successful;
+ *     }
+ * }
+ *
+ * ..........
+ *
+ *     BandwidthCheckFunction func = new BandwidthCheckFunction(lockHelper, portBandwidthDao, "Port-123", 100, 1000);
+ *     func.exec();
+ *     boolean success = func.isSuccessful();
+ * ..........
+ * 
+ * + * @see LockHelper + */ public abstract class SynchronizedFunction { - private Set synchset; - private String lockRequester; - private int lockTimeout; // Seconds - private LockHelper lockHelper; + private Set syncSet; + private String lockRequester; + private int lockTimeout; // Seconds + private LockHelper lockHelper; - protected SynchronizedFunction(LockHelper lockHelper, Collection synchset, int lockTimeout) { - this.lockHelper = lockHelper; - this.synchset = new HashSet(synchset); - this.lockRequester = generateLockRequester(); - this.lockTimeout = lockTimeout; - } + /** + * @param lockHelper {@link LockHelper} service implementation + * @param syncSet the set of resources to be locked during processing + * @param lockTimeout the lock expiration timeout (see {@link LockHelper#lock(String, String, int) + * LockHelper.lock}) + */ + protected SynchronizedFunction(LockHelper lockHelper, Collection syncSet, int lockTimeout) { + this.lockHelper = lockHelper; + this.syncSet = new HashSet<>(syncSet); + lockRequester = generateLockRequester(); + this.lockTimeout = lockTimeout; + } - protected abstract void _exec(); + /** + * Implement this method with the required processing. This method is executed while the resources + * are locked (syncSet provided in the constructor). + */ + protected abstract void _exec(); - public void exec() { - lockHelper.lock(synchset, lockRequester, lockTimeout); - try { - _exec(); - } finally { - lockHelper.unlock(synchset, true); - } - } + /** + * Call this method to execute the provided processing in the derived class (the implemented + * _exec method). + */ + public void exec() { + lockHelper.lock(syncSet, lockRequester, lockTimeout); + try { + _exec(); + } finally { + lockHelper.unlock(syncSet, true); + } + } - private static String generateLockRequester() { - return "SynchronizedFunction-" + (int) (Math.random() * 1000000); - } + private static String generateLockRequester() { + return "SynchronizedFunction-" + (int) (Math.random() * 1000000); + } } diff --git a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java index 9f37894c9..cce377e2c 100644 --- a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java +++ b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java @@ -7,10 +7,10 @@ import org.slf4j.LoggerFactory; public class TestLockHelper { - private static final Logger log = LoggerFactory.getLogger(TestLockHelper.class); + private static final Logger log = LoggerFactory.getLogger(TestLockHelper.class); - @Test - public void test1() throws Exception { + @Test + public void test1() throws Exception { LockThread t1 = new LockThread("req1"); LockThread t2 = new LockThread("req2"); LockThread t3 = new LockThread("req3"); @@ -22,30 +22,30 @@ public class TestLockHelper { t1.join(); t2.join(); t3.join(); - } + } - private class LockThread extends Thread { + private class LockThread extends Thread { - private String requester; + private String requester; - public LockThread(String requester) { - this.requester = requester; - } + public LockThread(String requester) { + this.requester = requester; + } - @Override - public void run() { - LockHelperImpl lockHelper = new LockHelperImpl(); - lockHelper.setDataSource(DbUtil.getDataSource()); + @Override + public void run() { + LockHelperImpl lockHelper = new LockHelperImpl(); + lockHelper.setDataSource(DbUtil.getDataSource()); - lockHelper.lock("resource1", requester, 20); + lockHelper.lock("resource1", requester, 20); - try { - Thread.sleep(500); - } catch (InterruptedException e) { - log.warn("Thread interrupted: " + e.getMessage(), e); - } + try { + Thread.sleep(500); + } catch (InterruptedException e) { + log.warn("Thread interrupted: " + e.getMessage(), e); + } - lockHelper.unlock("resource1", false); - } - } + lockHelper.unlock("resource1", false); + } + } } diff --git a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java index 38d4d62c1..954b532cb 100644 --- a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java +++ b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java @@ -6,87 +6,83 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; - import javax.sql.DataSource; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DbUtil { - private static final Logger log = LoggerFactory.getLogger(DbUtil.class); - - private static DataSource dataSource = null; - - public static synchronized DataSource getDataSource() { - if (dataSource == null) { - String url = "jdbc:h2:mem:app;DB_CLOSE_DELAY=-1"; - - dataSource = new DataSource() { - - @Override - public T unwrap(Class arg0) throws SQLException { - return null; - } - - @Override - public boolean isWrapperFor(Class arg0) throws SQLException { - return false; - } - - @Override - public void setLoginTimeout(int arg0) throws SQLException { - } - - @Override - public void setLogWriter(PrintWriter arg0) throws SQLException { - } - - @Override - public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException { - return null; - } - - @Override - public int getLoginTimeout() throws SQLException { - return 0; - } - - @Override - public PrintWriter getLogWriter() throws SQLException { - return null; - } - - @Override - public Connection getConnection(String username, String password) throws SQLException { - return null; - } - - @Override - public Connection getConnection() throws SQLException { - return DriverManager.getConnection(url); - } - }; - - try { - String script = FileUtil.read("/schema.sql"); - - String[] sqlList = script.split(";"); - try (Connection con = dataSource.getConnection()) { - for (String sql : sqlList) { - if (!sql.trim().isEmpty()) { - sql = sql.trim(); - try (PreparedStatement ps = con.prepareStatement(sql)) { - log.info("Executing statement:\n" + sql); - ps.execute(); - } - } - } - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - return dataSource; - } + private static final Logger log = LoggerFactory.getLogger(DbUtil.class); + + private static DataSource dataSource = null; + + public static synchronized DataSource getDataSource() { + if (dataSource == null) { + String url = "jdbc:h2:mem:app;DB_CLOSE_DELAY=-1"; + + dataSource = new DataSource() { + + @Override + public T unwrap(Class arg0) throws SQLException { + return null; + } + + @Override + public boolean isWrapperFor(Class arg0) throws SQLException { + return false; + } + + @Override + public void setLoginTimeout(int arg0) throws SQLException {} + + @Override + public void setLogWriter(PrintWriter arg0) throws SQLException {} + + @Override + public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException { + return null; + } + + @Override + public int getLoginTimeout() throws SQLException { + return 0; + } + + @Override + public PrintWriter getLogWriter() throws SQLException { + return null; + } + + @Override + public Connection getConnection(String username, String password) throws SQLException { + return null; + } + + @Override + public Connection getConnection() throws SQLException { + return DriverManager.getConnection(url); + } + }; + + try { + String script = FileUtil.read("/schema.sql"); + + String[] sqlList = script.split(";"); + try (Connection con = dataSource.getConnection()) { + for (String sql : sqlList) { + if (!sql.trim().isEmpty()) { + sql = sql.trim(); + try (PreparedStatement ps = con.prepareStatement(sql)) { + log.info("Executing statement:\n" + sql); + ps.execute(); + } + } + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return dataSource; + } } diff --git a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java index e51a3b082..5c4ccbf21 100644 --- a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java +++ b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java @@ -6,19 +6,19 @@ import java.io.InputStreamReader; public class FileUtil { - public static String read(String fileName) throws Exception { - String ss = ""; - try (InputStream is = DbUtil.class.getResourceAsStream(fileName)) { - try (InputStreamReader isr = new InputStreamReader(is)) { - try (BufferedReader in = new BufferedReader(isr)) { - String s = in.readLine(); - while (s != null) { - ss += s + '\n'; - s = in.readLine(); - } - } - } - } - return ss; - } + public static String read(String fileName) throws Exception { + String ss = ""; + try (InputStream is = FileUtil.class.getResourceAsStream(fileName)) { + try (InputStreamReader isr = new InputStreamReader(is)) { + try (BufferedReader in = new BufferedReader(isr)) { + String s = in.readLine(); + while (s != null) { + ss += s + '\n'; + s = in.readLine(); + } + } + } + } + return ss; + } } -- cgit 1.2.3-korg