summaryrefslogtreecommitdiffstats
path: root/lib/rlock
diff options
context:
space:
mode:
Diffstat (limited to 'lib/rlock')
-rw-r--r--lib/rlock/pom.xml43
-rw-r--r--lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java14
-rw-r--r--lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java173
-rw-r--r--lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java13
-rw-r--r--lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java122
-rw-r--r--lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java20
-rw-r--r--lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java35
-rw-r--r--lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java51
-rw-r--r--lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java92
-rw-r--r--lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java24
-rw-r--r--lib/rlock/src/test/resources/schema.sql10
11 files changed, 597 insertions, 0 deletions
diff --git a/lib/rlock/pom.xml b/lib/rlock/pom.xml
new file mode 100644
index 000000000..51b778bad
--- /dev/null
+++ b/lib/rlock/pom.xml
@@ -0,0 +1,43 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.onap.ccsdk.features</groupId>
+ <artifactId>ccsdk-features</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.onap.ccsdk.features.lib.rlock</groupId>
+ <artifactId>rlock</artifactId>
+
+ <description>Resource Lock - Distributed locking feature using database</description>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java
new file mode 100644
index 000000000..17817fe1d
--- /dev/null
+++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java
@@ -0,0 +1,14 @@
+package org.onap.ccsdk.features.lib.rlock;
+
+import java.util.Collection;
+
+public interface LockHelper {
+
+ void lock(String resourceName, String lockRequester, int lockTimeout /* Seconds */);
+
+ void unlock(String resourceName, boolean force);
+
+ void lock(Collection<String> resourceNameList, String lockRequester, int lockTimeout /* Seconds */);
+
+ void unlock(Collection<String> resourceNameList, boolean force);
+}
diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java
new file mode 100644
index 000000000..666fb6af5
--- /dev/null
+++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java
@@ -0,0 +1,173 @@
+package org.onap.ccsdk.features.lib.rlock;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+
+import javax.sql.DataSource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LockHelperImpl implements LockHelper {
+
+ private static final Logger log = LoggerFactory.getLogger(LockHelperImpl.class);
+
+ private int retryCount = 20;
+ private int lockWait = 5; // Seconds
+
+ private DataSource dataSource;
+
+ @Override
+ public void lock(String resourceName, String lockRequester, int lockTimeout /* Seconds */) {
+ lock(Collections.singleton(resourceName), lockRequester, lockTimeout);
+ }
+
+ @Override
+ public void unlock(String resourceName, boolean force) {
+ unlock(Collections.singleton(resourceName), force);
+ }
+
+ @Override
+ public void lock(Collection<String> resourceNameList, String lockRequester, int lockTimeout /* Seconds */) {
+ for (int i = 0; true; i++) {
+ try {
+ tryLock(resourceNameList, lockRequester, lockTimeout);
+ log.info("Resources locked: " + resourceNameList);
+ return;
+ } catch (ResourceLockedException e) {
+ if (i > retryCount) {
+ throw e;
+ }
+ try {
+ Thread.sleep(lockWait * 1000L);
+ } catch (InterruptedException ex) {
+ }
+ }
+ }
+ }
+
+ @Override
+ public void unlock(Collection<String> lockNames, boolean force) {
+ if (lockNames == null || lockNames.size() == 0) {
+ return;
+ }
+
+ try (ResourceLockDao resourceLockDao = new ResourceLockDao(dataSource)) {
+ try {
+ for (String name : lockNames) {
+ ResourceLock l = resourceLockDao.getByResourceName(name);
+ if (l != null) {
+ if (force || l.lockCount == 1) {
+ resourceLockDao.delete(l.id);
+ } else {
+ resourceLockDao.decrementLockCount(l.id);
+ }
+ }
+ }
+ resourceLockDao.commit();
+ log.info("Resources unlocked: " + lockNames);
+ } catch (Exception e) {
+ resourceLockDao.rollback();
+ }
+ }
+ }
+
+ public void tryLock(Collection<String> resourceNameList, String lockRequester, int lockTimeout /* Seconds */) {
+ if (resourceNameList == null || resourceNameList.isEmpty()) {
+ return;
+ }
+
+ lockRequester = generateLockRequester(lockRequester, 100);
+
+ // First check if all requested records are available to lock
+
+ Date now = new Date();
+
+ try (ResourceLockDao resourceLockDao = new ResourceLockDao(dataSource)) {
+ try {
+ List<ResourceLock> dbLockList = new ArrayList<>();
+ List<String> insertLockNameList = new ArrayList<>();
+ for (String name : resourceNameList) {
+ ResourceLock l = resourceLockDao.getByResourceName(name);
+
+ boolean canLock = l == null || now.getTime() > l.expirationTime.getTime() || lockRequester != null && lockRequester.equals(l.lockHolder) || l.lockCount <= 0;
+ if (!canLock) {
+ throw new ResourceLockedException(l.resourceName, l.lockHolder, lockRequester);
+ }
+
+ if (l != null) {
+ if (now.getTime() > l.expirationTime.getTime() || l.lockCount <= 0) {
+ l.lockCount = 0;
+ }
+ dbLockList.add(l);
+ } else {
+ insertLockNameList.add(name);
+ }
+ }
+
+ // Update the lock info in DB
+ for (ResourceLock l : dbLockList) {
+ resourceLockDao.update(l.id, lockRequester, now, new Date(now.getTime() + lockTimeout * 1000), l.lockCount + 1);
+ }
+
+ // Insert records for those that are not yet there
+ for (String lockName : insertLockNameList) {
+ ResourceLock l = new ResourceLock();
+ l.resourceName = lockName;
+ l.lockHolder = lockRequester;
+ l.lockTime = now;
+ l.expirationTime = new Date(now.getTime() + lockTimeout * 1000);
+ l.lockCount = 1;
+
+ try {
+ resourceLockDao.add(l);
+ } catch (Exception e) {
+ throw new ResourceLockedException(l.resourceName, "unknown", lockRequester);
+ }
+ }
+
+ resourceLockDao.commit();
+
+ } catch (Exception e) {
+ resourceLockDao.rollback();
+ throw e;
+ }
+ }
+ }
+
+ private static String generateLockRequester(String name, int maxLength) {
+ if (name == null) {
+ name = "";
+ }
+ int l1 = name.length();
+ String tname = Thread.currentThread().getName();
+ int l2 = tname.length();
+ if (l1 + l2 + 1 > maxLength) {
+ int maxl1 = maxLength / 2;
+ if (l1 > maxl1) {
+ name = name.substring(0, maxl1);
+ l1 = maxl1;
+ }
+ int maxl2 = maxLength - l1 - 1;
+ if (l2 > maxl2) {
+ tname = tname.substring(0, 6) + "..." + tname.substring(l2 - maxl2 + 9);
+ }
+ }
+ return tname + '-' + name;
+ }
+
+ public void setRetryCount(int retryCount) {
+ this.retryCount = retryCount;
+ }
+
+ public void setLockWait(int lockWait) {
+ this.lockWait = lockWait;
+ }
+
+ public void setDataSource(DataSource dataSource) {
+ this.dataSource = dataSource;
+ }
+}
diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java
new file mode 100644
index 000000000..a7e966855
--- /dev/null
+++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java
@@ -0,0 +1,13 @@
+package org.onap.ccsdk.features.lib.rlock;
+
+import java.util.Date;
+
+public class ResourceLock {
+
+ public long id;
+ public String resourceName;
+ public String lockHolder;
+ public int lockCount;
+ public Date lockTime;
+ public Date expirationTime;
+}
diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java
new file mode 100644
index 000000000..4833bb28e
--- /dev/null
+++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java
@@ -0,0 +1,122 @@
+package org.onap.ccsdk.features.lib.rlock;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Date;
+
+import javax.sql.DataSource;
+
+public class ResourceLockDao implements AutoCloseable {
+
+ private Connection con;
+
+ public ResourceLockDao(DataSource dataSource) {
+ try {
+ con = dataSource.getConnection();
+ con.setAutoCommit(false);
+ } catch (SQLException e) {
+ throw new RuntimeException("Error getting DB connection: " + e.getMessage(), e);
+ }
+ }
+
+ public void add(ResourceLock l) {
+ String sql = "INSERT INTO RESOURCE_LOCK (resource_name, lock_holder, lock_count, lock_time, expiration_time)\n" + "VALUES (?, ?, ?, ?, ?)";
+
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setString(1, l.resourceName);
+ ps.setString(2, l.lockHolder);
+ ps.setInt(3, l.lockCount);
+ ps.setTimestamp(4, new Timestamp(l.lockTime.getTime()));
+ ps.setTimestamp(5, new Timestamp(l.expirationTime.getTime()));
+ ps.execute();
+ } catch (SQLException e) {
+ throw new RuntimeException("Error adding lock to DB: " + e.getMessage(), e);
+ }
+ }
+
+ public void update(long id, String lockHolder, Date lockTime, Date expirationTime, int lockCount) {
+ String sql = "UPDATE RESOURCE_LOCK SET lock_holder = ?, lock_time = ?, expiration_time = ?, lock_count = ? WHERE resource_lock_id = ?";
+
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setString(1, lockHolder);
+ ps.setTimestamp(2, new Timestamp(lockTime.getTime()));
+ ps.setTimestamp(3, new Timestamp(expirationTime.getTime()));
+ ps.setInt(4, lockCount);
+ ps.setLong(5, id);
+ ps.execute();
+ } catch (SQLException e) {
+ throw new RuntimeException("Error updating lock in DB: " + e.getMessage(), e);
+ }
+ }
+
+ public ResourceLock getByResourceName(String resourceName) {
+ String sql = "SELECT * FROM RESOURCE_LOCK WHERE resource_name = ?";
+
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setString(1, resourceName);
+ try (ResultSet rs = ps.executeQuery()) {
+ if (rs.next()) {
+ ResourceLock rl = new ResourceLock();
+ rl.id = rs.getLong("resource_lock_id");
+ rl.resourceName = rs.getString("resource_name");
+ rl.lockHolder = rs.getString("lock_holder");
+ rl.lockCount = rs.getInt("lock_count");
+ rl.lockTime = rs.getTimestamp("lock_time");
+ rl.expirationTime = rs.getTimestamp("expiration_time");
+ return rl;
+ }
+ return null;
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Error reading lock from DB: " + e.getMessage(), e);
+ }
+ }
+
+ public void delete(long id) {
+ String sql = "DELETE FROM RESOURCE_LOCK WHERE resource_lock_id = ?";
+
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setLong(1, id);
+ ps.execute();
+ } catch (SQLException e) {
+ throw new RuntimeException("Error deleting lock from DB: " + e.getMessage(), e);
+ }
+ }
+
+ public void decrementLockCount(long id) {
+ String sql = "UPDATE RESOURCE_LOCK SET lock_count = lock_count - 1 WHERE resource_lock_id = ?";
+
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setLong(1, id);
+ ps.execute();
+ } catch (SQLException e) {
+ throw new RuntimeException("Error updating lock count in DB: " + e.getMessage(), e);
+ }
+ }
+
+ public void commit() {
+ try {
+ con.commit();
+ } catch (SQLException e) {
+ throw new RuntimeException("Error committing DB connection: " + e.getMessage(), e);
+ }
+ }
+
+ public void rollback() {
+ try {
+ con.rollback();
+ } catch (SQLException e) {
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ con.close();
+ } catch (SQLException e) {
+ }
+ }
+}
diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java
new file mode 100644
index 000000000..7c8cfa122
--- /dev/null
+++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java
@@ -0,0 +1,20 @@
+package org.onap.ccsdk.features.lib.rlock;
+
+public class ResourceLockedException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ private String lockName, lockHolder, lockRequester;
+
+ public ResourceLockedException(String lockName, String lockHolder, String lockRequester) {
+ this.lockName = lockName;
+ this.lockHolder = lockHolder;
+ this.lockRequester = lockRequester;
+ }
+
+ @Override
+ public String getMessage() {
+ return "Failed to lock [" + lockName + "] for [" + lockRequester + "]. Currently locked by [" + lockHolder +
+ "].";
+ }
+}
diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java
new file mode 100644
index 000000000..ff25e16f0
--- /dev/null
+++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java
@@ -0,0 +1,35 @@
+package org.onap.ccsdk.features.lib.rlock;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+public abstract class SynchronizedFunction {
+
+ private Set<String> synchset;
+ private String lockRequester;
+ private int lockTimeout; // Seconds
+ private LockHelper lockHelper;
+
+ protected SynchronizedFunction(LockHelper lockHelper, Collection<String> synchset, int lockTimeout) {
+ this.lockHelper = lockHelper;
+ this.synchset = new HashSet<String>(synchset);
+ this.lockRequester = generateLockRequester();
+ this.lockTimeout = lockTimeout;
+ }
+
+ protected abstract void _exec();
+
+ public void exec() {
+ lockHelper.lock(synchset, lockRequester, lockTimeout);
+ try {
+ _exec();
+ } finally {
+ lockHelper.unlock(synchset, true);
+ }
+ }
+
+ private static String generateLockRequester() {
+ return "SynchronizedFunction-" + (int) (Math.random() * 1000000);
+ }
+}
diff --git a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java
new file mode 100644
index 000000000..9f37894c9
--- /dev/null
+++ b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java
@@ -0,0 +1,51 @@
+package org.onap.ccsdk.features.lib.rlock;
+
+import org.junit.Test;
+import org.onap.ccsdk.features.lib.rlock.testutils.DbUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestLockHelper {
+
+ private static final Logger log = LoggerFactory.getLogger(TestLockHelper.class);
+
+ @Test
+ public void test1() throws Exception {
+ LockThread t1 = new LockThread("req1");
+ LockThread t2 = new LockThread("req2");
+ LockThread t3 = new LockThread("req3");
+
+ t1.start();
+ t2.start();
+ t3.start();
+
+ t1.join();
+ t2.join();
+ t3.join();
+ }
+
+ private class LockThread extends Thread {
+
+ private String requester;
+
+ public LockThread(String requester) {
+ this.requester = requester;
+ }
+
+ @Override
+ public void run() {
+ LockHelperImpl lockHelper = new LockHelperImpl();
+ lockHelper.setDataSource(DbUtil.getDataSource());
+
+ lockHelper.lock("resource1", requester, 20);
+
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ log.warn("Thread interrupted: " + e.getMessage(), e);
+ }
+
+ lockHelper.unlock("resource1", false);
+ }
+ }
+}
diff --git a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java
new file mode 100644
index 000000000..38d4d62c1
--- /dev/null
+++ b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java
@@ -0,0 +1,92 @@
+package org.onap.ccsdk.features.lib.rlock.testutils;
+
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+
+import javax.sql.DataSource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DbUtil {
+
+ private static final Logger log = LoggerFactory.getLogger(DbUtil.class);
+
+ private static DataSource dataSource = null;
+
+ public static synchronized DataSource getDataSource() {
+ if (dataSource == null) {
+ String url = "jdbc:h2:mem:app;DB_CLOSE_DELAY=-1";
+
+ dataSource = new DataSource() {
+
+ @Override
+ public <T> T unwrap(Class<T> arg0) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> arg0) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void setLoginTimeout(int arg0) throws SQLException {
+ }
+
+ @Override
+ public void setLogWriter(PrintWriter arg0) throws SQLException {
+ }
+
+ @Override
+ public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
+ return null;
+ }
+
+ @Override
+ public int getLoginTimeout() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public PrintWriter getLogWriter() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Connection getConnection(String username, String password) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ return DriverManager.getConnection(url);
+ }
+ };
+
+ try {
+ String script = FileUtil.read("/schema.sql");
+
+ String[] sqlList = script.split(";");
+ try (Connection con = dataSource.getConnection()) {
+ for (String sql : sqlList) {
+ if (!sql.trim().isEmpty()) {
+ sql = sql.trim();
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ log.info("Executing statement:\n" + sql);
+ ps.execute();
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return dataSource;
+ }
+}
diff --git a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java
new file mode 100644
index 000000000..e51a3b082
--- /dev/null
+++ b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java
@@ -0,0 +1,24 @@
+package org.onap.ccsdk.features.lib.rlock.testutils;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+public class FileUtil {
+
+ public static String read(String fileName) throws Exception {
+ String ss = "";
+ try (InputStream is = DbUtil.class.getResourceAsStream(fileName)) {
+ try (InputStreamReader isr = new InputStreamReader(is)) {
+ try (BufferedReader in = new BufferedReader(isr)) {
+ String s = in.readLine();
+ while (s != null) {
+ ss += s + '\n';
+ s = in.readLine();
+ }
+ }
+ }
+ }
+ return ss;
+ }
+}
diff --git a/lib/rlock/src/test/resources/schema.sql b/lib/rlock/src/test/resources/schema.sql
new file mode 100644
index 000000000..26f38f684
--- /dev/null
+++ b/lib/rlock/src/test/resources/schema.sql
@@ -0,0 +1,10 @@
+CREATE TABLE IF NOT EXISTS `resource_lock` (
+ `resource_lock_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+ `resource_name` varchar(256),
+ `lock_holder` varchar(100) NOT NULL,
+ `lock_count` smallint(6) NOT NULL,
+ `lock_time` datetime NOT NULL,
+ `expiration_time` datetime NOT NULL,
+ PRIMARY KEY (`resource_lock_id`),
+ UNIQUE KEY `IX1_RESOURCE_LOCK` (`resource_name`)
+);