diff options
Diffstat (limited to 'lib/rlock')
11 files changed, 597 insertions, 0 deletions
diff --git a/lib/rlock/pom.xml b/lib/rlock/pom.xml new file mode 100644 index 000000000..51b778bad --- /dev/null +++ b/lib/rlock/pom.xml @@ -0,0 +1,43 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.onap.ccsdk.features</groupId> + <artifactId>ccsdk-features</artifactId> + <version>1.0.0-SNAPSHOT</version> + </parent> + + <groupId>org.onap.ccsdk.features.lib.rlock</groupId> + <artifactId>rlock</artifactId> + + <description>Resource Lock - Distributed locking feature using database</description> + + <dependencies> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + +</project> diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java new file mode 100644 index 000000000..17817fe1d --- /dev/null +++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java @@ -0,0 +1,14 @@ +package org.onap.ccsdk.features.lib.rlock; + +import java.util.Collection; + +public interface LockHelper { + + void lock(String resourceName, String lockRequester, int lockTimeout /* Seconds */); + + void unlock(String resourceName, boolean force); + + void lock(Collection<String> resourceNameList, String lockRequester, int lockTimeout /* Seconds */); + + void unlock(Collection<String> resourceNameList, boolean force); +} diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java new file mode 100644 index 000000000..666fb6af5 --- /dev/null +++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java @@ -0,0 +1,173 @@ +package org.onap.ccsdk.features.lib.rlock; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.List; + +import javax.sql.DataSource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LockHelperImpl implements LockHelper { + + private static final Logger log = LoggerFactory.getLogger(LockHelperImpl.class); + + private int retryCount = 20; + private int lockWait = 5; // Seconds + + private DataSource dataSource; + + @Override + public void lock(String resourceName, String lockRequester, int lockTimeout /* Seconds */) { + lock(Collections.singleton(resourceName), lockRequester, lockTimeout); + } + + @Override + public void unlock(String resourceName, boolean force) { + unlock(Collections.singleton(resourceName), force); + } + + @Override + public void lock(Collection<String> resourceNameList, String lockRequester, int lockTimeout /* Seconds */) { + for (int i = 0; true; i++) { + try { + tryLock(resourceNameList, lockRequester, lockTimeout); + log.info("Resources locked: " + resourceNameList); + return; + } catch (ResourceLockedException e) { + if (i > retryCount) { + throw e; + } + try { + Thread.sleep(lockWait * 1000L); + } catch (InterruptedException ex) { + } + } + } + } + + @Override + public void unlock(Collection<String> lockNames, boolean force) { + if (lockNames == null || lockNames.size() == 0) { + return; + } + + try (ResourceLockDao resourceLockDao = new ResourceLockDao(dataSource)) { + try { + for (String name : lockNames) { + ResourceLock l = resourceLockDao.getByResourceName(name); + if (l != null) { + if (force || l.lockCount == 1) { + resourceLockDao.delete(l.id); + } else { + resourceLockDao.decrementLockCount(l.id); + } + } + } + resourceLockDao.commit(); + log.info("Resources unlocked: " + lockNames); + } catch (Exception e) { + resourceLockDao.rollback(); + } + } + } + + public void tryLock(Collection<String> resourceNameList, String lockRequester, int lockTimeout /* Seconds */) { + if (resourceNameList == null || resourceNameList.isEmpty()) { + return; + } + + lockRequester = generateLockRequester(lockRequester, 100); + + // First check if all requested records are available to lock + + Date now = new Date(); + + try (ResourceLockDao resourceLockDao = new ResourceLockDao(dataSource)) { + try { + List<ResourceLock> dbLockList = new ArrayList<>(); + List<String> insertLockNameList = new ArrayList<>(); + for (String name : resourceNameList) { + ResourceLock l = resourceLockDao.getByResourceName(name); + + boolean canLock = l == null || now.getTime() > l.expirationTime.getTime() || lockRequester != null && lockRequester.equals(l.lockHolder) || l.lockCount <= 0; + if (!canLock) { + throw new ResourceLockedException(l.resourceName, l.lockHolder, lockRequester); + } + + if (l != null) { + if (now.getTime() > l.expirationTime.getTime() || l.lockCount <= 0) { + l.lockCount = 0; + } + dbLockList.add(l); + } else { + insertLockNameList.add(name); + } + } + + // Update the lock info in DB + for (ResourceLock l : dbLockList) { + resourceLockDao.update(l.id, lockRequester, now, new Date(now.getTime() + lockTimeout * 1000), l.lockCount + 1); + } + + // Insert records for those that are not yet there + for (String lockName : insertLockNameList) { + ResourceLock l = new ResourceLock(); + l.resourceName = lockName; + l.lockHolder = lockRequester; + l.lockTime = now; + l.expirationTime = new Date(now.getTime() + lockTimeout * 1000); + l.lockCount = 1; + + try { + resourceLockDao.add(l); + } catch (Exception e) { + throw new ResourceLockedException(l.resourceName, "unknown", lockRequester); + } + } + + resourceLockDao.commit(); + + } catch (Exception e) { + resourceLockDao.rollback(); + throw e; + } + } + } + + private static String generateLockRequester(String name, int maxLength) { + if (name == null) { + name = ""; + } + int l1 = name.length(); + String tname = Thread.currentThread().getName(); + int l2 = tname.length(); + if (l1 + l2 + 1 > maxLength) { + int maxl1 = maxLength / 2; + if (l1 > maxl1) { + name = name.substring(0, maxl1); + l1 = maxl1; + } + int maxl2 = maxLength - l1 - 1; + if (l2 > maxl2) { + tname = tname.substring(0, 6) + "..." + tname.substring(l2 - maxl2 + 9); + } + } + return tname + '-' + name; + } + + public void setRetryCount(int retryCount) { + this.retryCount = retryCount; + } + + public void setLockWait(int lockWait) { + this.lockWait = lockWait; + } + + public void setDataSource(DataSource dataSource) { + this.dataSource = dataSource; + } +} diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java new file mode 100644 index 000000000..a7e966855 --- /dev/null +++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java @@ -0,0 +1,13 @@ +package org.onap.ccsdk.features.lib.rlock; + +import java.util.Date; + +public class ResourceLock { + + public long id; + public String resourceName; + public String lockHolder; + public int lockCount; + public Date lockTime; + public Date expirationTime; +} diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java new file mode 100644 index 000000000..4833bb28e --- /dev/null +++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java @@ -0,0 +1,122 @@ +package org.onap.ccsdk.features.lib.rlock; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.Date; + +import javax.sql.DataSource; + +public class ResourceLockDao implements AutoCloseable { + + private Connection con; + + public ResourceLockDao(DataSource dataSource) { + try { + con = dataSource.getConnection(); + con.setAutoCommit(false); + } catch (SQLException e) { + throw new RuntimeException("Error getting DB connection: " + e.getMessage(), e); + } + } + + public void add(ResourceLock l) { + String sql = "INSERT INTO RESOURCE_LOCK (resource_name, lock_holder, lock_count, lock_time, expiration_time)\n" + "VALUES (?, ?, ?, ?, ?)"; + + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setString(1, l.resourceName); + ps.setString(2, l.lockHolder); + ps.setInt(3, l.lockCount); + ps.setTimestamp(4, new Timestamp(l.lockTime.getTime())); + ps.setTimestamp(5, new Timestamp(l.expirationTime.getTime())); + ps.execute(); + } catch (SQLException e) { + throw new RuntimeException("Error adding lock to DB: " + e.getMessage(), e); + } + } + + public void update(long id, String lockHolder, Date lockTime, Date expirationTime, int lockCount) { + String sql = "UPDATE RESOURCE_LOCK SET lock_holder = ?, lock_time = ?, expiration_time = ?, lock_count = ? WHERE resource_lock_id = ?"; + + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setString(1, lockHolder); + ps.setTimestamp(2, new Timestamp(lockTime.getTime())); + ps.setTimestamp(3, new Timestamp(expirationTime.getTime())); + ps.setInt(4, lockCount); + ps.setLong(5, id); + ps.execute(); + } catch (SQLException e) { + throw new RuntimeException("Error updating lock in DB: " + e.getMessage(), e); + } + } + + public ResourceLock getByResourceName(String resourceName) { + String sql = "SELECT * FROM RESOURCE_LOCK WHERE resource_name = ?"; + + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setString(1, resourceName); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + ResourceLock rl = new ResourceLock(); + rl.id = rs.getLong("resource_lock_id"); + rl.resourceName = rs.getString("resource_name"); + rl.lockHolder = rs.getString("lock_holder"); + rl.lockCount = rs.getInt("lock_count"); + rl.lockTime = rs.getTimestamp("lock_time"); + rl.expirationTime = rs.getTimestamp("expiration_time"); + return rl; + } + return null; + } + } catch (SQLException e) { + throw new RuntimeException("Error reading lock from DB: " + e.getMessage(), e); + } + } + + public void delete(long id) { + String sql = "DELETE FROM RESOURCE_LOCK WHERE resource_lock_id = ?"; + + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setLong(1, id); + ps.execute(); + } catch (SQLException e) { + throw new RuntimeException("Error deleting lock from DB: " + e.getMessage(), e); + } + } + + public void decrementLockCount(long id) { + String sql = "UPDATE RESOURCE_LOCK SET lock_count = lock_count - 1 WHERE resource_lock_id = ?"; + + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setLong(1, id); + ps.execute(); + } catch (SQLException e) { + throw new RuntimeException("Error updating lock count in DB: " + e.getMessage(), e); + } + } + + public void commit() { + try { + con.commit(); + } catch (SQLException e) { + throw new RuntimeException("Error committing DB connection: " + e.getMessage(), e); + } + } + + public void rollback() { + try { + con.rollback(); + } catch (SQLException e) { + } + } + + @Override + public void close() { + try { + con.close(); + } catch (SQLException e) { + } + } +} diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java new file mode 100644 index 000000000..7c8cfa122 --- /dev/null +++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java @@ -0,0 +1,20 @@ +package org.onap.ccsdk.features.lib.rlock; + +public class ResourceLockedException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + private String lockName, lockHolder, lockRequester; + + public ResourceLockedException(String lockName, String lockHolder, String lockRequester) { + this.lockName = lockName; + this.lockHolder = lockHolder; + this.lockRequester = lockRequester; + } + + @Override + public String getMessage() { + return "Failed to lock [" + lockName + "] for [" + lockRequester + "]. Currently locked by [" + lockHolder + + "]."; + } +} diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java new file mode 100644 index 000000000..ff25e16f0 --- /dev/null +++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java @@ -0,0 +1,35 @@ +package org.onap.ccsdk.features.lib.rlock; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +public abstract class SynchronizedFunction { + + private Set<String> synchset; + private String lockRequester; + private int lockTimeout; // Seconds + private LockHelper lockHelper; + + protected SynchronizedFunction(LockHelper lockHelper, Collection<String> synchset, int lockTimeout) { + this.lockHelper = lockHelper; + this.synchset = new HashSet<String>(synchset); + this.lockRequester = generateLockRequester(); + this.lockTimeout = lockTimeout; + } + + protected abstract void _exec(); + + public void exec() { + lockHelper.lock(synchset, lockRequester, lockTimeout); + try { + _exec(); + } finally { + lockHelper.unlock(synchset, true); + } + } + + private static String generateLockRequester() { + return "SynchronizedFunction-" + (int) (Math.random() * 1000000); + } +} diff --git a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java new file mode 100644 index 000000000..9f37894c9 --- /dev/null +++ b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java @@ -0,0 +1,51 @@ +package org.onap.ccsdk.features.lib.rlock; + +import org.junit.Test; +import org.onap.ccsdk.features.lib.rlock.testutils.DbUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestLockHelper { + + private static final Logger log = LoggerFactory.getLogger(TestLockHelper.class); + + @Test + public void test1() throws Exception { + LockThread t1 = new LockThread("req1"); + LockThread t2 = new LockThread("req2"); + LockThread t3 = new LockThread("req3"); + + t1.start(); + t2.start(); + t3.start(); + + t1.join(); + t2.join(); + t3.join(); + } + + private class LockThread extends Thread { + + private String requester; + + public LockThread(String requester) { + this.requester = requester; + } + + @Override + public void run() { + LockHelperImpl lockHelper = new LockHelperImpl(); + lockHelper.setDataSource(DbUtil.getDataSource()); + + lockHelper.lock("resource1", requester, 20); + + try { + Thread.sleep(500); + } catch (InterruptedException e) { + log.warn("Thread interrupted: " + e.getMessage(), e); + } + + lockHelper.unlock("resource1", false); + } + } +} diff --git a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java new file mode 100644 index 000000000..38d4d62c1 --- /dev/null +++ b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java @@ -0,0 +1,92 @@ +package org.onap.ccsdk.features.lib.rlock.testutils; + +import java.io.PrintWriter; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; + +import javax.sql.DataSource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DbUtil { + + private static final Logger log = LoggerFactory.getLogger(DbUtil.class); + + private static DataSource dataSource = null; + + public static synchronized DataSource getDataSource() { + if (dataSource == null) { + String url = "jdbc:h2:mem:app;DB_CLOSE_DELAY=-1"; + + dataSource = new DataSource() { + + @Override + public <T> T unwrap(Class<T> arg0) throws SQLException { + return null; + } + + @Override + public boolean isWrapperFor(Class<?> arg0) throws SQLException { + return false; + } + + @Override + public void setLoginTimeout(int arg0) throws SQLException { + } + + @Override + public void setLogWriter(PrintWriter arg0) throws SQLException { + } + + @Override + public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException { + return null; + } + + @Override + public int getLoginTimeout() throws SQLException { + return 0; + } + + @Override + public PrintWriter getLogWriter() throws SQLException { + return null; + } + + @Override + public Connection getConnection(String username, String password) throws SQLException { + return null; + } + + @Override + public Connection getConnection() throws SQLException { + return DriverManager.getConnection(url); + } + }; + + try { + String script = FileUtil.read("/schema.sql"); + + String[] sqlList = script.split(";"); + try (Connection con = dataSource.getConnection()) { + for (String sql : sqlList) { + if (!sql.trim().isEmpty()) { + sql = sql.trim(); + try (PreparedStatement ps = con.prepareStatement(sql)) { + log.info("Executing statement:\n" + sql); + ps.execute(); + } + } + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return dataSource; + } +} diff --git a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java new file mode 100644 index 000000000..e51a3b082 --- /dev/null +++ b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java @@ -0,0 +1,24 @@ +package org.onap.ccsdk.features.lib.rlock.testutils; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; + +public class FileUtil { + + public static String read(String fileName) throws Exception { + String ss = ""; + try (InputStream is = DbUtil.class.getResourceAsStream(fileName)) { + try (InputStreamReader isr = new InputStreamReader(is)) { + try (BufferedReader in = new BufferedReader(isr)) { + String s = in.readLine(); + while (s != null) { + ss += s + '\n'; + s = in.readLine(); + } + } + } + } + return ss; + } +} diff --git a/lib/rlock/src/test/resources/schema.sql b/lib/rlock/src/test/resources/schema.sql new file mode 100644 index 000000000..26f38f684 --- /dev/null +++ b/lib/rlock/src/test/resources/schema.sql @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXISTS `resource_lock` ( + `resource_lock_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, + `resource_name` varchar(256), + `lock_holder` varchar(100) NOT NULL, + `lock_count` smallint(6) NOT NULL, + `lock_time` datetime NOT NULL, + `expiration_time` datetime NOT NULL, + PRIMARY KEY (`resource_lock_id`), + UNIQUE KEY `IX1_RESOURCE_LOCK` (`resource_name`) +); |