summaryrefslogtreecommitdiffstats
path: root/lib/rlock
diff options
context:
space:
mode:
Diffstat (limited to 'lib/rlock')
-rw-r--r--lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java116
-rw-r--r--lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java348
-rw-r--r--lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java12
-rw-r--r--lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java220
-rw-r--r--lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java37
-rw-r--r--lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java113
-rw-r--r--lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java44
-rw-r--r--lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java150
-rw-r--r--lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java30
9 files changed, 650 insertions, 420 deletions
diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java
index 17817fe1d..88d016606 100644
--- a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java
+++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java
@@ -2,13 +2,121 @@ package org.onap.ccsdk.features.lib.rlock;
import java.util.Collection;
+/**
+ * <p>
+ * A cooperative locking service. This service can be used to synchronize access to shared
+ * resources, so only one thread has access to it at any time. It is <i>cooperative</i> locking,
+ * meaning the threads have to agree to explicitly lock the resource, whenever they need to access
+ * it and then explicitly unlock it, when they are done with the resource. While a resource is
+ * locked by a thread, no other thread can lock it, until it is unlocked.
+ * </p>
+ * <p>
+ * The term <tt><b>resource</b></tt> represents anything that needs a synchronized access, for
+ * example, message queue or connection pool or limited bandwidth capacity.
+ * </p>
+ * <p>
+ * The implementation here provides a <i>distributed</i> locking functionality, which means threads
+ * can reside in the same application instance or different application instances, on same or
+ * different machines (see {@link LockHelperImpl}).
+ * </p>
+ * <p>
+ * Example: Bandwidth check:
+ * </p>
+ * <pre style="color:darkblue;">
+ * boolean success = false;
+ * String resourceName = "Port-123-Bandwidth";
+ * int neededBandwidth = 100; // 100 Mbps
+ * int bandwidthLimit = 1000;
+ * int lockRequester = "this-thread-id-" + (int) (Math.random() * 1000000);
+ * int lockTimeout = 30; // 30 sec - the longest time, we expect to complete the check
+ *
+ * try {
+ * lockHelper.lock(resourceName, lockRequester, lockTimeout);
+ *
+ * int usedBandwidth = portBandwidthDao.readUsedBandwidth("Port-123");
+ * if (usedBandwidth + neededBandwidth <= bandwidthLimit) {
+ * portBandwidthDao.updateUsedBandwidth("Port-123", usedBandwidth + neededBandwidth);
+ * success = true;
+ * }
+ * } finally {
+ * lockHelper.unlock(resourceName, true);
+ * }
+ * </pre>
+ *
+ * @see SynchronizedFunction
+ * @see LockHelperImpl
+ * @see ResourceLockedException
+ */
public interface LockHelper {
- void lock(String resourceName, String lockRequester, int lockTimeout /* Seconds */);
+ /**
+ * <p>
+ * Locks the specified resource. Lock requester identifies the thread that is requesting the lock.
+ * If the resource is already locked by another lock requester (another thread), then the thread is
+ * blocked until the resource is available. If the resource is available or locked by the same
+ * thread, the lock succeeds.<br/>
+ * Usually lock requester can be generated using the thread name and some random number.
+ * </p>
+ * <p>
+ * Lock timeout specifies how long (in seconds) to keep the lock in case the thread misses to unlock
+ * the resource. The lock will expire after this time in case the resource is never unlocked. This
+ * time should be set to the maximum time expected for the processing of the resource, so the lock
+ * does not expire until the thread is done with the resource. The lock timeout is supposed to avoid
+ * permanently locking a resource in case of application crash in the middle of processing.
+ * </p>
+ *
+ * @param resourceName Identifies the resource to be locked
+ * @param lockRequester Identifies the thread requesting the lock
+ * @param lockTimeout The expiration timeout of the lock (in seconds)
+ * @throws ResourceLockedException if the resource cannot be locked
+ */
+ void lock(String resourceName, String lockRequester, int lockTimeout /* Seconds */);
- void unlock(String resourceName, boolean force);
+ /**
+ * <p>
+ * Unlocks the specified resource. This method should always succeed including in the case, when the
+ * resource is already unlocked.
+ * </p>
+ * <p>
+ * Force parameter can be used in case the same thread might lock the same resource multiple times.
+ * In case resource is locked multiple times, normally, in case force parameter is false, the thread
+ * will need to unlock the resource the same number of times for the resource to become available
+ * again. If the force parameter is true, then the resource will be unlocked immediately, no matter
+ * how many times it has been locked.
+ * </p>
+ *
+ * @param resourceName Identifies the resource to be unlocked
+ * @param force If true, forces resource to be unlocked immediately, even if it has been locked more
+ * than once
+ */
+ void unlock(String resourceName, boolean force);
- void lock(Collection<String> resourceNameList, String lockRequester, int lockTimeout /* Seconds */);
+ /**
+ * <p>
+ * Locks multiple resources at once. It ensures that either all resources are successfully locked or
+ * none is.
+ * </p>
+ *
+ * @param resourceNameList The set of resources to lock
+ * @param lockRequester Identifies the thread requesting the lock
+ * @param lockTimeout The expiration timeout of the lock (in seconds)
+ * @throws ResourceLockedException if any of the resources cannot be locked
+ *
+ * @see LockHelper#lock(String, String, int)
+ */
+ void lock(Collection<String> resourceNameList, String lockRequester, int lockTimeout /* Seconds */);
- void unlock(Collection<String> resourceNameList, boolean force);
+ /**
+ * <p>
+ * Unlocks the specified set of resources. This method should always succeed including in the case,
+ * when any of the resources are already unlocked.
+ * </p>
+ *
+ * @param resourceNameList The set of resources to unlock
+ * @param force If true, forces all resources to be unlocked immediately, even if any of them have
+ * been locked more than once
+ *
+ * @see LockHelper#unlock(String, boolean)
+ */
+ void unlock(Collection<String> resourceNameList, boolean force);
}
diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java
index 666fb6af5..63fe111df 100644
--- a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java
+++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java
@@ -5,169 +5,211 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
-
import javax.sql.DataSource;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * <p>
+ * Implementation of the locking service, providing <i>distributed</i> locking functionality. It is
+ * done using a table in SQL Database as a single synchronization point. Hence, for this
+ * implementation, it is required that all participating threads in all participating applications
+ * access the same database instance (or a distributed database that looks like one database
+ * instance).
+ * </p>
+ * <p>
+ * The following table is required in the database:
+ * </p>
+ *
+ * <pre style="color:darkblue;">
+ * CREATE TABLE IF NOT EXISTS `resource_lock` (
+ * `resource_lock_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+ * `resource_name` varchar(256),
+ * `lock_holder` varchar(100) NOT NULL,
+ * `lock_count` smallint(6) NOT NULL,
+ * `lock_time` datetime NOT NULL,
+ * `expiration_time` datetime NOT NULL,
+ * PRIMARY KEY (`resource_lock_id`),
+ * UNIQUE KEY `IX1_RESOURCE_LOCK` (`resource_name`)
+ * );
+ * </pre>
+ * <p>
+ * The implementation tries to insert records in the table for all the requested resources. If there
+ * are already records for any of the resources, it fails and then makes several more tries before
+ * giving up and throwing {@link ResourceLockedException}.
+ * </p>
+ * <p>
+ * The class has 2 configurable parameters:
+ * <ul>
+ * <li><tt><b>retryCount</b></tt>: the numbers of retries, when locking a resource, default 20</li>
+ * <li><tt><b>lockWait</b></tt>: the time between each retry (in seconds), default 5 seconds</li>
+ * </ul>
+ * The total time before locking fails would be <tt>retryCount * lockWait</tt> seconds.
+ * </p>
+ *
+ * @see LockHelper
+ * @see SynchronizedFunction
+ * @see ResourceLockedException
+ */
public class LockHelperImpl implements LockHelper {
- private static final Logger log = LoggerFactory.getLogger(LockHelperImpl.class);
+ private static final Logger log = LoggerFactory.getLogger(LockHelperImpl.class);
- private int retryCount = 20;
- private int lockWait = 5; // Seconds
+ private int retryCount = 20;
+ private int lockWait = 5; // Seconds
- private DataSource dataSource;
+ private DataSource dataSource;
- @Override
- public void lock(String resourceName, String lockRequester, int lockTimeout /* Seconds */) {
- lock(Collections.singleton(resourceName), lockRequester, lockTimeout);
- }
+ @Override
+ public void lock(String resourceName, String lockRequester, int lockTimeout /* Seconds */) {
+ lock(Collections.singleton(resourceName), lockRequester, lockTimeout);
+ }
- @Override
- public void unlock(String resourceName, boolean force) {
- unlock(Collections.singleton(resourceName), force);
- }
+ @Override
+ public void unlock(String resourceName, boolean force) {
+ unlock(Collections.singleton(resourceName), force);
+ }
- @Override
- public void lock(Collection<String> resourceNameList, String lockRequester, int lockTimeout /* Seconds */) {
- for (int i = 0; true; i++) {
- try {
- tryLock(resourceNameList, lockRequester, lockTimeout);
+ @Override
+ public void lock(Collection<String> resourceNameList, String lockRequester, int lockTimeout /* Seconds */) {
+ for (int i = 0; true; i++) {
+ try {
+ tryLock(resourceNameList, lockRequester, lockTimeout);
log.info("Resources locked: " + resourceNameList);
- return;
- } catch (ResourceLockedException e) {
- if (i > retryCount) {
- throw e;
- }
- try {
- Thread.sleep(lockWait * 1000L);
- } catch (InterruptedException ex) {
- }
- }
- }
- }
-
- @Override
- public void unlock(Collection<String> lockNames, boolean force) {
- if (lockNames == null || lockNames.size() == 0) {
- return;
- }
-
- try (ResourceLockDao resourceLockDao = new ResourceLockDao(dataSource)) {
- try {
- for (String name : lockNames) {
- ResourceLock l = resourceLockDao.getByResourceName(name);
- if (l != null) {
- if (force || l.lockCount == 1) {
- resourceLockDao.delete(l.id);
- } else {
- resourceLockDao.decrementLockCount(l.id);
- }
- }
- }
- resourceLockDao.commit();
- log.info("Resources unlocked: " + lockNames);
- } catch (Exception e) {
- resourceLockDao.rollback();
- }
- }
- }
-
- public void tryLock(Collection<String> resourceNameList, String lockRequester, int lockTimeout /* Seconds */) {
- if (resourceNameList == null || resourceNameList.isEmpty()) {
- return;
- }
-
- lockRequester = generateLockRequester(lockRequester, 100);
-
- // First check if all requested records are available to lock
-
- Date now = new Date();
-
- try (ResourceLockDao resourceLockDao = new ResourceLockDao(dataSource)) {
- try {
- List<ResourceLock> dbLockList = new ArrayList<>();
- List<String> insertLockNameList = new ArrayList<>();
- for (String name : resourceNameList) {
- ResourceLock l = resourceLockDao.getByResourceName(name);
-
- boolean canLock = l == null || now.getTime() > l.expirationTime.getTime() || lockRequester != null && lockRequester.equals(l.lockHolder) || l.lockCount <= 0;
- if (!canLock) {
- throw new ResourceLockedException(l.resourceName, l.lockHolder, lockRequester);
- }
-
- if (l != null) {
- if (now.getTime() > l.expirationTime.getTime() || l.lockCount <= 0) {
- l.lockCount = 0;
- }
- dbLockList.add(l);
- } else {
- insertLockNameList.add(name);
- }
- }
-
- // Update the lock info in DB
- for (ResourceLock l : dbLockList) {
- resourceLockDao.update(l.id, lockRequester, now, new Date(now.getTime() + lockTimeout * 1000), l.lockCount + 1);
- }
-
- // Insert records for those that are not yet there
- for (String lockName : insertLockNameList) {
- ResourceLock l = new ResourceLock();
- l.resourceName = lockName;
- l.lockHolder = lockRequester;
- l.lockTime = now;
- l.expirationTime = new Date(now.getTime() + lockTimeout * 1000);
- l.lockCount = 1;
-
- try {
- resourceLockDao.add(l);
- } catch (Exception e) {
- throw new ResourceLockedException(l.resourceName, "unknown", lockRequester);
- }
- }
-
- resourceLockDao.commit();
-
- } catch (Exception e) {
- resourceLockDao.rollback();
- throw e;
- }
- }
- }
-
- private static String generateLockRequester(String name, int maxLength) {
- if (name == null) {
- name = "";
- }
- int l1 = name.length();
- String tname = Thread.currentThread().getName();
- int l2 = tname.length();
- if (l1 + l2 + 1 > maxLength) {
- int maxl1 = maxLength / 2;
- if (l1 > maxl1) {
- name = name.substring(0, maxl1);
- l1 = maxl1;
- }
- int maxl2 = maxLength - l1 - 1;
- if (l2 > maxl2) {
- tname = tname.substring(0, 6) + "..." + tname.substring(l2 - maxl2 + 9);
- }
- }
- return tname + '-' + name;
- }
-
- public void setRetryCount(int retryCount) {
- this.retryCount = retryCount;
- }
-
- public void setLockWait(int lockWait) {
- this.lockWait = lockWait;
- }
-
- public void setDataSource(DataSource dataSource) {
- this.dataSource = dataSource;
- }
+ return;
+ } catch (ResourceLockedException e) {
+ if (i > retryCount) {
+ throw e;
+ }
+ try {
+ Thread.sleep(lockWait * 1000L);
+ } catch (InterruptedException ex) {
+ }
+ }
+ }
+ }
+
+ @Override
+ public void unlock(Collection<String> lockNames, boolean force) {
+ if (lockNames == null || lockNames.size() == 0) {
+ return;
+ }
+
+ try (ResourceLockDao resourceLockDao = new ResourceLockDao(dataSource)) {
+ try {
+ for (String name : lockNames) {
+ ResourceLock l = resourceLockDao.getByResourceName(name);
+ if (l != null) {
+ if (force || l.lockCount == 1) {
+ resourceLockDao.delete(l.id);
+ } else {
+ resourceLockDao.decrementLockCount(l.id);
+ }
+ }
+ }
+ resourceLockDao.commit();
+ log.info("Resources unlocked: " + lockNames);
+ } catch (Exception e) {
+ resourceLockDao.rollback();
+ }
+ }
+ }
+
+ public void tryLock(Collection<String> resourceNameList, String lockRequester, int lockTimeout /* Seconds */) {
+ if (resourceNameList == null || resourceNameList.isEmpty()) {
+ return;
+ }
+
+ lockRequester = generateLockRequester(lockRequester, 100);
+
+ // First check if all requested records are available to lock
+
+ Date now = new Date();
+
+ try (ResourceLockDao resourceLockDao = new ResourceLockDao(dataSource)) {
+ try {
+ List<ResourceLock> dbLockList = new ArrayList<>();
+ List<String> insertLockNameList = new ArrayList<>();
+ for (String name : resourceNameList) {
+ ResourceLock l = resourceLockDao.getByResourceName(name);
+
+ boolean canLock = l == null || now.getTime() > l.expirationTime.getTime()
+ || lockRequester != null && lockRequester.equals(l.lockHolder) || l.lockCount <= 0;
+ if (!canLock) {
+ throw new ResourceLockedException(l.resourceName, l.lockHolder, lockRequester);
+ }
+
+ if (l != null) {
+ if (now.getTime() > l.expirationTime.getTime() || l.lockCount <= 0) {
+ l.lockCount = 0;
+ }
+ dbLockList.add(l);
+ } else {
+ insertLockNameList.add(name);
+ }
+ }
+
+ // Update the lock info in DB
+ for (ResourceLock l : dbLockList) {
+ resourceLockDao.update(l.id, lockRequester, now, new Date(now.getTime() + lockTimeout * 1000),
+ l.lockCount + 1);
+ }
+
+ // Insert records for those that are not yet there
+ for (String lockName : insertLockNameList) {
+ ResourceLock l = new ResourceLock();
+ l.resourceName = lockName;
+ l.lockHolder = lockRequester;
+ l.lockTime = now;
+ l.expirationTime = new Date(now.getTime() + lockTimeout * 1000);
+ l.lockCount = 1;
+
+ try {
+ resourceLockDao.add(l);
+ } catch (Exception e) {
+ throw new ResourceLockedException(l.resourceName, "unknown", lockRequester);
+ }
+ }
+
+ resourceLockDao.commit();
+
+ } catch (Exception e) {
+ resourceLockDao.rollback();
+ throw e;
+ }
+ }
+ }
+
+ private static String generateLockRequester(String name, int maxLength) {
+ if (name == null) {
+ name = "";
+ }
+ int l1 = name.length();
+ String tname = Thread.currentThread().getName();
+ int l2 = tname.length();
+ if (l1 + l2 + 1 > maxLength) {
+ int maxl1 = maxLength / 2;
+ if (l1 > maxl1) {
+ name = name.substring(0, maxl1);
+ l1 = maxl1;
+ }
+ int maxl2 = maxLength - l1 - 1;
+ if (l2 > maxl2) {
+ tname = tname.substring(0, 6) + "..." + tname.substring(l2 - maxl2 + 9);
+ }
+ }
+ return tname + '-' + name;
+ }
+
+ public void setRetryCount(int retryCount) {
+ this.retryCount = retryCount;
+ }
+
+ public void setLockWait(int lockWait) {
+ this.lockWait = lockWait;
+ }
+
+ public void setDataSource(DataSource dataSource) {
+ this.dataSource = dataSource;
+ }
}
diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java
index a7e966855..a51523859 100644
--- a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java
+++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java
@@ -4,10 +4,10 @@ import java.util.Date;
public class ResourceLock {
- public long id;
- public String resourceName;
- public String lockHolder;
- public int lockCount;
- public Date lockTime;
- public Date expirationTime;
+ public long id;
+ public String resourceName;
+ public String lockHolder;
+ public int lockCount;
+ public Date lockTime;
+ public Date expirationTime;
}
diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java
index 4833bb28e..23328577a 100644
--- a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java
+++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java
@@ -6,117 +6,119 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Date;
-
import javax.sql.DataSource;
public class ResourceLockDao implements AutoCloseable {
- private Connection con;
-
- public ResourceLockDao(DataSource dataSource) {
- try {
- con = dataSource.getConnection();
- con.setAutoCommit(false);
- } catch (SQLException e) {
- throw new RuntimeException("Error getting DB connection: " + e.getMessage(), e);
- }
- }
-
- public void add(ResourceLock l) {
- String sql = "INSERT INTO RESOURCE_LOCK (resource_name, lock_holder, lock_count, lock_time, expiration_time)\n" + "VALUES (?, ?, ?, ?, ?)";
-
- try (PreparedStatement ps = con.prepareStatement(sql)) {
- ps.setString(1, l.resourceName);
- ps.setString(2, l.lockHolder);
- ps.setInt(3, l.lockCount);
- ps.setTimestamp(4, new Timestamp(l.lockTime.getTime()));
- ps.setTimestamp(5, new Timestamp(l.expirationTime.getTime()));
- ps.execute();
- } catch (SQLException e) {
- throw new RuntimeException("Error adding lock to DB: " + e.getMessage(), e);
- }
- }
-
- public void update(long id, String lockHolder, Date lockTime, Date expirationTime, int lockCount) {
- String sql = "UPDATE RESOURCE_LOCK SET lock_holder = ?, lock_time = ?, expiration_time = ?, lock_count = ? WHERE resource_lock_id = ?";
-
- try (PreparedStatement ps = con.prepareStatement(sql)) {
- ps.setString(1, lockHolder);
- ps.setTimestamp(2, new Timestamp(lockTime.getTime()));
- ps.setTimestamp(3, new Timestamp(expirationTime.getTime()));
- ps.setInt(4, lockCount);
- ps.setLong(5, id);
- ps.execute();
- } catch (SQLException e) {
- throw new RuntimeException("Error updating lock in DB: " + e.getMessage(), e);
- }
- }
-
- public ResourceLock getByResourceName(String resourceName) {
- String sql = "SELECT * FROM RESOURCE_LOCK WHERE resource_name = ?";
-
- try (PreparedStatement ps = con.prepareStatement(sql)) {
- ps.setString(1, resourceName);
- try (ResultSet rs = ps.executeQuery()) {
- if (rs.next()) {
- ResourceLock rl = new ResourceLock();
- rl.id = rs.getLong("resource_lock_id");
- rl.resourceName = rs.getString("resource_name");
- rl.lockHolder = rs.getString("lock_holder");
- rl.lockCount = rs.getInt("lock_count");
- rl.lockTime = rs.getTimestamp("lock_time");
- rl.expirationTime = rs.getTimestamp("expiration_time");
- return rl;
- }
- return null;
- }
- } catch (SQLException e) {
- throw new RuntimeException("Error reading lock from DB: " + e.getMessage(), e);
- }
- }
-
- public void delete(long id) {
- String sql = "DELETE FROM RESOURCE_LOCK WHERE resource_lock_id = ?";
-
- try (PreparedStatement ps = con.prepareStatement(sql)) {
- ps.setLong(1, id);
- ps.execute();
- } catch (SQLException e) {
- throw new RuntimeException("Error deleting lock from DB: " + e.getMessage(), e);
- }
- }
-
- public void decrementLockCount(long id) {
- String sql = "UPDATE RESOURCE_LOCK SET lock_count = lock_count - 1 WHERE resource_lock_id = ?";
-
- try (PreparedStatement ps = con.prepareStatement(sql)) {
- ps.setLong(1, id);
- ps.execute();
- } catch (SQLException e) {
- throw new RuntimeException("Error updating lock count in DB: " + e.getMessage(), e);
- }
- }
-
- public void commit() {
- try {
- con.commit();
- } catch (SQLException e) {
- throw new RuntimeException("Error committing DB connection: " + e.getMessage(), e);
- }
- }
-
- public void rollback() {
- try {
- con.rollback();
- } catch (SQLException e) {
- }
- }
-
- @Override
- public void close() {
- try {
- con.close();
- } catch (SQLException e) {
- }
- }
+ private Connection con;
+
+ public ResourceLockDao(DataSource dataSource) {
+ try {
+ con = dataSource.getConnection();
+ con.setAutoCommit(false);
+ } catch (SQLException e) {
+ throw new RuntimeException("Error getting DB connection: " + e.getMessage(), e);
+ }
+ }
+
+ public void add(ResourceLock l) {
+ String sql = "INSERT INTO RESOURCE_LOCK (resource_name, lock_holder, lock_count, lock_time, expiration_time)\n"
+ + "VALUES (?, ?, ?, ?, ?)";
+
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setString(1, l.resourceName);
+ ps.setString(2, l.lockHolder);
+ ps.setInt(3, l.lockCount);
+ ps.setTimestamp(4, new Timestamp(l.lockTime.getTime()));
+ ps.setTimestamp(5, new Timestamp(l.expirationTime.getTime()));
+ ps.execute();
+ } catch (SQLException e) {
+ throw new RuntimeException("Error adding lock to DB: " + e.getMessage(), e);
+ }
+ }
+
+ public void update(long id, String lockHolder, Date lockTime, Date expirationTime, int lockCount) {
+ String sql =
+ "UPDATE RESOURCE_LOCK SET lock_holder = ?, lock_time = ?, expiration_time = ?, lock_count = ?\n"
+ + "WHERE resource_lock_id = ?";
+
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setString(1, lockHolder);
+ ps.setTimestamp(2, new Timestamp(lockTime.getTime()));
+ ps.setTimestamp(3, new Timestamp(expirationTime.getTime()));
+ ps.setInt(4, lockCount);
+ ps.setLong(5, id);
+ ps.execute();
+ } catch (SQLException e) {
+ throw new RuntimeException("Error updating lock in DB: " + e.getMessage(), e);
+ }
+ }
+
+ public ResourceLock getByResourceName(String resourceName) {
+ String sql = "SELECT * FROM RESOURCE_LOCK WHERE resource_name = ?";
+
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setString(1, resourceName);
+ try (ResultSet rs = ps.executeQuery()) {
+ if (rs.next()) {
+ ResourceLock rl = new ResourceLock();
+ rl.id = rs.getLong("resource_lock_id");
+ rl.resourceName = rs.getString("resource_name");
+ rl.lockHolder = rs.getString("lock_holder");
+ rl.lockCount = rs.getInt("lock_count");
+ rl.lockTime = rs.getTimestamp("lock_time");
+ rl.expirationTime = rs.getTimestamp("expiration_time");
+ return rl;
+ }
+ return null;
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Error reading lock from DB: " + e.getMessage(), e);
+ }
+ }
+
+ public void delete(long id) {
+ String sql = "DELETE FROM RESOURCE_LOCK WHERE resource_lock_id = ?";
+
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setLong(1, id);
+ ps.execute();
+ } catch (SQLException e) {
+ throw new RuntimeException("Error deleting lock from DB: " + e.getMessage(), e);
+ }
+ }
+
+ public void decrementLockCount(long id) {
+ String sql = "UPDATE RESOURCE_LOCK SET lock_count = lock_count - 1 WHERE resource_lock_id = ?";
+
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setLong(1, id);
+ ps.execute();
+ } catch (SQLException e) {
+ throw new RuntimeException("Error updating lock count in DB: " + e.getMessage(), e);
+ }
+ }
+
+ public void commit() {
+ try {
+ con.commit();
+ } catch (SQLException e) {
+ throw new RuntimeException("Error committing DB connection: " + e.getMessage(), e);
+ }
+ }
+
+ public void rollback() {
+ try {
+ con.rollback();
+ } catch (SQLException e) {
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ con.close();
+ } catch (SQLException e) {
+ }
+ }
}
diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java
index 7c8cfa122..c79c00f57 100644
--- a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java
+++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java
@@ -1,20 +1,33 @@
package org.onap.ccsdk.features.lib.rlock;
+/**
+ * <p>
+ * This exception is thrown by {@link LockHelper#lock(String, String, int) LockHelper.lock} methods,
+ * if a requested resource cannot be locked. This might happen, because another threads keeps a
+ * resource locked for a long time or there is a stale lock that hasn't expired yet.
+ * </p>
+ * <p>
+ * The exception message will contain the locked resource and what lock requester (thread) holds the
+ * resource locked.
+ * </p>
+ *
+ * @see LockHelperImpl
+ */
public class ResourceLockedException extends RuntimeException {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- private String lockName, lockHolder, lockRequester;
+ private String lockName, lockHolder, lockRequester;
- public ResourceLockedException(String lockName, String lockHolder, String lockRequester) {
- this.lockName = lockName;
- this.lockHolder = lockHolder;
- this.lockRequester = lockRequester;
- }
+ public ResourceLockedException(String lockName, String lockHolder, String lockRequester) {
+ this.lockName = lockName;
+ this.lockHolder = lockHolder;
+ this.lockRequester = lockRequester;
+ }
- @Override
- public String getMessage() {
- return "Failed to lock [" + lockName + "] for [" + lockRequester + "]. Currently locked by [" + lockHolder +
- "].";
- }
+ @Override
+ public String getMessage() {
+ return "Failed to lock [" + lockName + "] for [" + lockRequester + "]. Currently locked by [" + lockHolder
+ + "].";
+ }
}
diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java
index ff25e16f0..e92700055 100644
--- a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java
+++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java
@@ -4,32 +4,101 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
+/**
+ * <p>
+ * A simple abstract base class, providing functionality similar to the <tt>synchronized</tt> block
+ * in Java. Derived class provides the set of resources that need synchronized access and the
+ * processing method that is executed while the set of resources is locked (override <tt>_exec</tt>
+ * method). This class uses the {@link LockHelper} service to lock the resources, execute the
+ * processing method and then unlock the resources.
+ * </p>
+ * <p>
+ * Example:
+ * </p>
+ *
+ * <pre style="color:darkblue;">
+ * public class BandwidthCheckFunction extends SynchronizedFunction {
+ *
+ * private PortBandwidthDao portBandwidthDao;
+ * private int neededBandwidth;
+ * private int bandwidthLimit;
+ *
+ * private boolean successful; // Output
+ *
+ * public BandwidthCheckFunction(LockHelper lockHelper, PortBandwidthDao portBandwidthDao, String portId,
+ * int neededBandwidth, int bandwidthLimit) {
+ * super(lockHelper, Collections.singleton(portId + "-Bandwidth"), 60); // 60 sec lockTimeout
+ * this.portBandwidthDao = portBandwidthDao;
+ * this.neededBandwidth = neededBandwidth;
+ * this.bandwidthLimit = bandwidthLimit;
+ * }
+ *
+ * {@literal @}Override
+ * protected void _exec() {
+ * int usedBandwidth = portBandwidthDao.readUsedBandwidth("Port-123");
+ * if (usedBandwidth + neededBandwidth <= bandwidthLimit) {
+ * portBandwidthDao.updateUsedBandwidth("Port-123", usedBandwidth + neededBandwidth);
+ * successful = true;
+ * } else {
+ * successful = false;
+ * }
+ * }
+ *
+ * public boolean isSuccessful() {
+ * return successful;
+ * }
+ * }
+ *
+ * ..........
+ *
+ * BandwidthCheckFunction func = new BandwidthCheckFunction(lockHelper, portBandwidthDao, "Port-123", 100, 1000);
+ * func.exec();
+ * boolean success = func.isSuccessful();
+ * ..........
+ * </pre>
+ *
+ * @see LockHelper
+ */
public abstract class SynchronizedFunction {
- private Set<String> synchset;
- private String lockRequester;
- private int lockTimeout; // Seconds
- private LockHelper lockHelper;
+ private Set<String> syncSet;
+ private String lockRequester;
+ private int lockTimeout; // Seconds
+ private LockHelper lockHelper;
- protected SynchronizedFunction(LockHelper lockHelper, Collection<String> synchset, int lockTimeout) {
- this.lockHelper = lockHelper;
- this.synchset = new HashSet<String>(synchset);
- this.lockRequester = generateLockRequester();
- this.lockTimeout = lockTimeout;
- }
+ /**
+ * @param lockHelper {@link LockHelper} service implementation
+ * @param syncSet the set of resources to be locked during processing
+ * @param lockTimeout the lock expiration timeout (see {@link LockHelper#lock(String, String, int)
+ * LockHelper.lock})
+ */
+ protected SynchronizedFunction(LockHelper lockHelper, Collection<String> syncSet, int lockTimeout) {
+ this.lockHelper = lockHelper;
+ this.syncSet = new HashSet<>(syncSet);
+ lockRequester = generateLockRequester();
+ this.lockTimeout = lockTimeout;
+ }
- protected abstract void _exec();
+ /**
+ * Implement this method with the required processing. This method is executed while the resources
+ * are locked (<tt>syncSet</tt> provided in the constructor).
+ */
+ protected abstract void _exec();
- public void exec() {
- lockHelper.lock(synchset, lockRequester, lockTimeout);
- try {
- _exec();
- } finally {
- lockHelper.unlock(synchset, true);
- }
- }
+ /**
+ * Call this method to execute the provided processing in the derived class (the implemented
+ * <tt>_exec</tt> method).
+ */
+ public void exec() {
+ lockHelper.lock(syncSet, lockRequester, lockTimeout);
+ try {
+ _exec();
+ } finally {
+ lockHelper.unlock(syncSet, true);
+ }
+ }
- private static String generateLockRequester() {
- return "SynchronizedFunction-" + (int) (Math.random() * 1000000);
- }
+ private static String generateLockRequester() {
+ return "SynchronizedFunction-" + (int) (Math.random() * 1000000);
+ }
}
diff --git a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java
index 9f37894c9..cce377e2c 100644
--- a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java
+++ b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java
@@ -7,10 +7,10 @@ import org.slf4j.LoggerFactory;
public class TestLockHelper {
- private static final Logger log = LoggerFactory.getLogger(TestLockHelper.class);
+ private static final Logger log = LoggerFactory.getLogger(TestLockHelper.class);
- @Test
- public void test1() throws Exception {
+ @Test
+ public void test1() throws Exception {
LockThread t1 = new LockThread("req1");
LockThread t2 = new LockThread("req2");
LockThread t3 = new LockThread("req3");
@@ -22,30 +22,30 @@ public class TestLockHelper {
t1.join();
t2.join();
t3.join();
- }
+ }
- private class LockThread extends Thread {
+ private class LockThread extends Thread {
- private String requester;
+ private String requester;
- public LockThread(String requester) {
- this.requester = requester;
- }
+ public LockThread(String requester) {
+ this.requester = requester;
+ }
- @Override
- public void run() {
- LockHelperImpl lockHelper = new LockHelperImpl();
- lockHelper.setDataSource(DbUtil.getDataSource());
+ @Override
+ public void run() {
+ LockHelperImpl lockHelper = new LockHelperImpl();
+ lockHelper.setDataSource(DbUtil.getDataSource());
- lockHelper.lock("resource1", requester, 20);
+ lockHelper.lock("resource1", requester, 20);
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- log.warn("Thread interrupted: " + e.getMessage(), e);
- }
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ log.warn("Thread interrupted: " + e.getMessage(), e);
+ }
- lockHelper.unlock("resource1", false);
- }
- }
+ lockHelper.unlock("resource1", false);
+ }
+ }
}
diff --git a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java
index 38d4d62c1..954b532cb 100644
--- a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java
+++ b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java
@@ -6,87 +6,83 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
-
import javax.sql.DataSource;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DbUtil {
- private static final Logger log = LoggerFactory.getLogger(DbUtil.class);
-
- private static DataSource dataSource = null;
-
- public static synchronized DataSource getDataSource() {
- if (dataSource == null) {
- String url = "jdbc:h2:mem:app;DB_CLOSE_DELAY=-1";
-
- dataSource = new DataSource() {
-
- @Override
- public <T> T unwrap(Class<T> arg0) throws SQLException {
- return null;
- }
-
- @Override
- public boolean isWrapperFor(Class<?> arg0) throws SQLException {
- return false;
- }
-
- @Override
- public void setLoginTimeout(int arg0) throws SQLException {
- }
-
- @Override
- public void setLogWriter(PrintWriter arg0) throws SQLException {
- }
-
- @Override
- public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
- return null;
- }
-
- @Override
- public int getLoginTimeout() throws SQLException {
- return 0;
- }
-
- @Override
- public PrintWriter getLogWriter() throws SQLException {
- return null;
- }
-
- @Override
- public Connection getConnection(String username, String password) throws SQLException {
- return null;
- }
-
- @Override
- public Connection getConnection() throws SQLException {
- return DriverManager.getConnection(url);
- }
- };
-
- try {
- String script = FileUtil.read("/schema.sql");
-
- String[] sqlList = script.split(";");
- try (Connection con = dataSource.getConnection()) {
- for (String sql : sqlList) {
- if (!sql.trim().isEmpty()) {
- sql = sql.trim();
- try (PreparedStatement ps = con.prepareStatement(sql)) {
- log.info("Executing statement:\n" + sql);
- ps.execute();
- }
- }
- }
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- return dataSource;
- }
+ private static final Logger log = LoggerFactory.getLogger(DbUtil.class);
+
+ private static DataSource dataSource = null;
+
+ public static synchronized DataSource getDataSource() {
+ if (dataSource == null) {
+ String url = "jdbc:h2:mem:app;DB_CLOSE_DELAY=-1";
+
+ dataSource = new DataSource() {
+
+ @Override
+ public <T> T unwrap(Class<T> arg0) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> arg0) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void setLoginTimeout(int arg0) throws SQLException {}
+
+ @Override
+ public void setLogWriter(PrintWriter arg0) throws SQLException {}
+
+ @Override
+ public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
+ return null;
+ }
+
+ @Override
+ public int getLoginTimeout() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public PrintWriter getLogWriter() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Connection getConnection(String username, String password) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ return DriverManager.getConnection(url);
+ }
+ };
+
+ try {
+ String script = FileUtil.read("/schema.sql");
+
+ String[] sqlList = script.split(";");
+ try (Connection con = dataSource.getConnection()) {
+ for (String sql : sqlList) {
+ if (!sql.trim().isEmpty()) {
+ sql = sql.trim();
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ log.info("Executing statement:\n" + sql);
+ ps.execute();
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return dataSource;
+ }
}
diff --git a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java
index e51a3b082..5c4ccbf21 100644
--- a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java
+++ b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java
@@ -6,19 +6,19 @@ import java.io.InputStreamReader;
public class FileUtil {
- public static String read(String fileName) throws Exception {
- String ss = "";
- try (InputStream is = DbUtil.class.getResourceAsStream(fileName)) {
- try (InputStreamReader isr = new InputStreamReader(is)) {
- try (BufferedReader in = new BufferedReader(isr)) {
- String s = in.readLine();
- while (s != null) {
- ss += s + '\n';
- s = in.readLine();
- }
- }
- }
- }
- return ss;
- }
+ public static String read(String fileName) throws Exception {
+ String ss = "";
+ try (InputStream is = FileUtil.class.getResourceAsStream(fileName)) {
+ try (InputStreamReader isr = new InputStreamReader(is)) {
+ try (BufferedReader in = new BufferedReader(isr)) {
+ String s = in.readLine();
+ while (s != null) {
+ ss += s + '\n';
+ s = in.readLine();
+ }
+ }
+ }
+ }
+ return ss;
+ }
}