aboutsummaryrefslogtreecommitdiffstats
path: root/feature-distributed-locking/src
diff options
context:
space:
mode:
authorMagnusen, Drew (dm741q) <dm741q@att.com>2018-03-21 16:44:45 -0500
committerMagnusen, Drew (dm741q) <dm741q@att.com>2018-04-03 14:05:18 -0500
commitfff9b57f7411deb798431bd625944fcfdbe053ac (patch)
treec1d7b2d23df54a61a15cd0804f7cce3b42c527f7 /feature-distributed-locking/src
parent54bc3867539264a518c88772e82ea8070ef97c79 (diff)
Implementation of distributed locking feature
This feature is a very basic implementation of a distributed locking system. Issue-ID: POLICY-699 Change-Id: I012fd37926ccbbdd87a3e4acb2788b53680115f0 Signed-off-by: Magnusen, Drew (dm741q) <dm741q@att.com>
Diffstat (limited to 'feature-distributed-locking/src')
-rw-r--r--feature-distributed-locking/src/assembly/assemble_zip.xml75
-rw-r--r--feature-distributed-locking/src/main/feature/config/feature-distributed-locking.properties34
-rw-r--r--feature-distributed-locking/src/main/feature/db/pooling/sql/1804-distributedlocking.downgrade.sql20
-rw-r--r--feature-distributed-locking/src/main/feature/db/pooling/sql/1804-distributedlocking.upgrade.sql23
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java158
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java34
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java127
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java78
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java233
-rw-r--r--feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureAPI1
-rw-r--r--feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI1
-rw-r--r--feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/test/DistributedLockingFeatureExceptionTest.java36
-rw-r--r--feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/test/TargetLockTest.java220
-rw-r--r--feature-distributed-locking/src/test/resources/feature-distributed-locking.properties26
14 files changed, 1066 insertions, 0 deletions
diff --git a/feature-distributed-locking/src/assembly/assemble_zip.xml b/feature-distributed-locking/src/assembly/assemble_zip.xml
new file mode 100644
index 00000000..2112fbcd
--- /dev/null
+++ b/feature-distributed-locking/src/assembly/assemble_zip.xml
@@ -0,0 +1,75 @@
+<!--
+ ============LICENSE_START=======================================================
+ feature-distributed-locking
+ ================================================================================
+ Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ ================================================================================
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ============LICENSE_END=========================================================
+ -->
+
+<!-- Defines how we build the .zip file which is our distribution. -->
+
+<assembly
+ xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+ <id>feature-distributed-locking</id>
+ <formats>
+ <format>zip</format>
+ </formats>
+
+ <includeBaseDirectory>false</includeBaseDirectory>
+
+ <fileSets>
+ <fileSet>
+ <directory>target</directory>
+ <outputDirectory>lib/feature</outputDirectory>
+ <includes>
+ <include>feature-distributed-locking-${project.version}.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>target/assembly/lib</directory>
+ <outputDirectory>lib/dependencies</outputDirectory>
+ <includes>
+ <include>*.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/feature/config</directory>
+ <outputDirectory>config</outputDirectory>
+ <fileMode>0644</fileMode>
+ <excludes/>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/feature/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0744</fileMode>
+ <excludes/>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/feature/db</directory>
+ <outputDirectory>db</outputDirectory>
+ <fileMode>0744</fileMode>
+ <excludes/>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/feature/install</directory>
+ <outputDirectory>install</outputDirectory>
+ <fileMode>0744</fileMode>
+ <excludes/>
+ </fileSet>
+ </fileSets>
+
+</assembly>
diff --git a/feature-distributed-locking/src/main/feature/config/feature-distributed-locking.properties b/feature-distributed-locking/src/main/feature/config/feature-distributed-locking.properties
new file mode 100644
index 00000000..ee4aa474
--- /dev/null
+++ b/feature-distributed-locking/src/main/feature/config/feature-distributed-locking.properties
@@ -0,0 +1,34 @@
+###
+# ============LICENSE_START=======================================================
+ # feature-distributed-locking
+# ================================================================================
+# Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+
+#Database properties
+#javax.persistence.jdbc.driver= org.mariadb.jdbc.Driver
+#javax.persistence.jdbc.url=jdbc:mariadb://${{SQL_HOST}}:3306/locks
+#javax.persistence.jdbc.user=${{SQL_USER}}
+#javax.persistence.jdbc.password=${{SQL_PASSWORD}}
+
+#This value is added to System.currentTimeMs to
+#set expirationTime when a lock is obtained.
+#distributed.locking.lock.aging=1000
+
+#The frequency (in milliseconds) that the heartbeat
+#thread refreshes locks owned by the current host
+#distributed.locking.heartbeat.interval=5000
+
diff --git a/feature-distributed-locking/src/main/feature/db/pooling/sql/1804-distributedlocking.downgrade.sql b/feature-distributed-locking/src/main/feature/db/pooling/sql/1804-distributedlocking.downgrade.sql
new file mode 100644
index 00000000..cd1b815d
--- /dev/null
+++ b/feature-distributed-locking/src/main/feature/db/pooling/sql/1804-distributedlocking.downgrade.sql
@@ -0,0 +1,20 @@
+# ============LICENSE_START=======================================================
+# feature-distributed-locking
+# ================================================================================
+# Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+use pooling;
+drop table if exists locks; \ No newline at end of file
diff --git a/feature-distributed-locking/src/main/feature/db/pooling/sql/1804-distributedlocking.upgrade.sql b/feature-distributed-locking/src/main/feature/db/pooling/sql/1804-distributedlocking.upgrade.sql
new file mode 100644
index 00000000..be56d35e
--- /dev/null
+++ b/feature-distributed-locking/src/main/feature/db/pooling/sql/1804-distributedlocking.upgrade.sql
@@ -0,0 +1,23 @@
+# ============LICENSE_START=======================================================
+# feature-distributed-locking
+# ================================================================================
+# Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+ set foreign_key_checks=0;
+
+ CREATE TABLE if not exists pooling.locks (resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), expirationTime BIGINT, PRIMARY KEY (resourceId), INDEX idx_expirationTime(expirationTime), INDEX idx_host(host));
+
+ set foreign_key_checks=1; \ No newline at end of file
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java
new file mode 100644
index 00000000..cc7a7a12
--- /dev/null
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java
@@ -0,0 +1,158 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.policy.distributed.locking;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+import org.onap.policy.drools.core.lock.LockRequestFuture;
+import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureAPI;
+import org.onap.policy.drools.features.PolicyEngineFeatureAPI;
+import org.onap.policy.drools.persistence.SystemPersistence;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DistributedLockingFeature implements PolicyEngineFeatureAPI, PolicyResourceLockFeatureAPI {
+
+ /**
+ * Logger instance
+ */
+ private static final Logger logger = LoggerFactory.getLogger(DistributedLockingFeature.class);
+
+ /**
+ * Properties Configuration Name
+ */
+ public static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
+
+ /**
+ * Properties for locking feature
+ */
+ private DistributedLockingProperties lockProps;
+
+ /**
+ *ScheduledExecutorService for LockHeartbeat
+ */
+ private ScheduledExecutorService scheduledExecutorService;
+
+ /**
+ * UUID
+ */
+ private static final UUID uuid = UUID.randomUUID();
+
+ /**
+ * Config directory
+ */
+ @Override
+ public int getSequenceNumber() {
+ return 1000;
+ }
+
+ @Override
+ public Future<Boolean> beforeLock(String resourceId, String owner, Callback callback) {
+
+ TargetLock tLock = new TargetLock(resourceId, this.uuid, owner, lockProps);
+
+ return new LockRequestFuture(resourceId, owner, tLock.lock());
+
+ }
+
+ @Override
+ public Boolean beforeUnlock(String resourceId, String owner) {
+ TargetLock tLock = new TargetLock(resourceId, this.uuid, owner, lockProps);
+
+ return tLock.unlock();
+ }
+
+ @Override
+ public Boolean beforeIsLockedBy(String resourceId, String owner) {
+ TargetLock tLock = new TargetLock(resourceId, this.uuid, owner, lockProps);
+
+ return tLock.isActive();
+ }
+
+ @Override
+ public Boolean beforeIsLocked(String resourceId) {
+ TargetLock tLock = new TargetLock(resourceId, this.uuid, "dummyOwner", lockProps);
+
+ return tLock.isLocked();
+ }
+
+ @Override
+ public boolean afterStart(PolicyEngine engine) {
+
+ try {
+ this.lockProps = new DistributedLockingProperties(SystemPersistence.manager.getProperties(DistributedLockingFeature.CONFIGURATION_PROPERTIES_NAME));
+ } catch (PropertyException e) {
+ logger.error("DistributedLockingFeature feature properies have not been loaded", e);
+ throw new DistributedLockingFeatureException(e);
+ }
+
+ long heartbeatInterval = this.lockProps.getHeartBeatIntervalProperty();
+
+ cleanLockTable();
+ Heartbeat heartbeat = new Heartbeat(this.uuid, lockProps);
+
+ this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
+ this.scheduledExecutorService.scheduleAtFixedRate(heartbeat, heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS);
+ return false;
+ }
+
+ /**
+ * This method kills the heartbeat thread and calls refreshLockTable which removes
+ * any records from the db where the current host is the owner.
+ */
+ @Override
+ public boolean beforeShutdown(PolicyEngine engine) {
+ scheduledExecutorService.shutdown();
+ cleanLockTable();
+ return false;
+ }
+
+ /**
+ * This method removes all records owned by the current host from the db.
+ */
+ private void cleanLockTable() {
+
+ try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(),
+ lockProps.getDbUser(),
+ lockProps.getDbPwd());
+ PreparedStatement statement = conn.prepareStatement("DELETE FROM pooling.locks WHERE host = ? OR expirationTime < ?");
+ ){
+
+ statement.setString(1, this.uuid.toString());
+ statement.setLong(2, System.currentTimeMillis());
+ statement.executeUpdate();
+
+ } catch (SQLException e) {
+ logger.error("error in refreshLockTable()", e);
+ }
+
+ }
+
+}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java
new file mode 100644
index 00000000..f28ccbc9
--- /dev/null
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking;
+
+public class DistributedLockingFeatureException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ *
+ * @param e
+ * exception to be wrapped
+ */
+ public DistributedLockingFeatureException(Exception e) {
+ super(e);
+ }
+}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java
new file mode 100644
index 00000000..139bfb7b
--- /dev/null
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java
@@ -0,0 +1,127 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.policy.distributed.locking;
+
+import java.util.Properties;
+
+import org.onap.policy.common.utils.properties.PropertyConfiguration;
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DistributedLockingProperties extends PropertyConfiguration{
+
+ private static final Logger logger = LoggerFactory.getLogger(DistributedLockingProperties.class);
+
+ /**
+ * Feature properties all begin with this prefix.
+ */
+ public static final String PREFIX = "distributed.locking.";
+
+ public static final String DB_DRIVER = "javax.persistence.jdbc.driver";
+ public static final String DB_URL = "javax.persistence.jdbc.url";
+ public static final String DB_USER = "javax.persistence.jdbc.user";
+ public static final String DB_PWD = "javax.persistence.jdbc.password";
+ public static final String AGING_PROPERTY = PREFIX + "lock.aging";
+ public static final String HEARTBEAT_INTERVAL_PROPERTY = PREFIX + "heartbeat.interval";
+
+ /**
+ * Properties from which this was constructed.
+ */
+ private Properties source;
+
+ /**
+ * Database driver
+ */
+ @Property(name = DB_DRIVER)
+ private String dbDriver;
+
+ /**
+ * Database url
+ */
+ @Property(name = DB_URL)
+ private String dbUrl;
+
+ /**
+ * Database user
+ */
+ @Property(name = DB_USER)
+ private String dbUser;
+
+ /**
+ * Database password
+ */
+ @Property(name = DB_PWD)
+ private String dbPwd;
+
+ /**
+ * Used to set expiration time for lock.
+ */
+ @Property(name = AGING_PROPERTY, defaultValue = "300000")
+ private long agingProperty;
+
+ /**
+ * Indicates intervals at which we refresh locks.
+ */
+ @Property(name = HEARTBEAT_INTERVAL_PROPERTY, defaultValue = "60000")
+ private long heartBeatIntervalProperty;
+
+ public DistributedLockingProperties(Properties props) throws PropertyException {
+ super(props);
+ source = props;
+ }
+
+
+ public Properties getSource() {
+ return source;
+ }
+
+
+ public String getDbDriver() {
+ return dbDriver;
+ }
+
+
+ public String getDbUrl() {
+ return dbUrl;
+ }
+
+
+ public String getDbUser() {
+ return dbUser;
+ }
+
+
+ public String getDbPwd() {
+ return dbPwd;
+ }
+
+
+ public long getAgingProperty() {
+ return agingProperty;
+ }
+
+
+ public long getHeartBeatIntervalProperty() {
+ return heartBeatIntervalProperty;
+ }
+
+}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java
new file mode 100644
index 00000000..c753dba9
--- /dev/null
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java
@@ -0,0 +1,78 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.onap.policy.drools.utils.NetworkUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * This runnable class scans the locks table for all locks owned by this host.
+ * It refreshes the expiration time of each lock using the locking.distributed.aging
+ * property
+ *
+ */
+public class Heartbeat implements Runnable{
+
+ private static final Logger logger = LoggerFactory.getLogger(Heartbeat.class);
+
+ /**
+ * Properties object containing properties needed by class
+ */
+ private DistributedLockingProperties lockProps;
+
+ /**
+ * UUID
+ */
+ private UUID uuid;
+
+ public Heartbeat(UUID uuid, DistributedLockingProperties lockProps) {
+ this.lockProps = lockProps;
+ this.uuid = uuid;
+ }
+
+ @Override
+ public void run() {
+
+ long expirationAge = lockProps.getAgingProperty();
+
+ try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(),
+ lockProps.getDbPwd());
+ PreparedStatement statement = conn
+ .prepareStatement("UPDATE pooling.locks SET expirationTime = ? WHERE host = ?");) {
+
+ statement.setLong(1, System.currentTimeMillis() + expirationAge);
+ statement.setString(2, this.uuid.toString());
+ statement.executeUpdate();
+ } catch (SQLException e) {
+ logger.error("error in Heartbeat.run()", e);
+ }
+
+ }
+}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java
new file mode 100644
index 00000000..ceaa849f
--- /dev/null
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java
@@ -0,0 +1,233 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.policy.distributed.locking;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TargetLock {
+
+ private static final Logger logger = LoggerFactory.getLogger(TargetLock.class);
+
+ /**
+ * The Target resource we want to lock
+ */
+ private String resourceId;
+
+ /**
+ * Properties object containing properties needed by class
+ */
+ private DistributedLockingProperties lockProps;
+
+ /**
+ * UUID
+ */
+ private UUID uuid;
+
+ /**
+ * Owner
+ */
+ private String owner;
+
+ /**
+ * Constructs a TargetLock object.
+ *
+ * @param resourceId ID of the entity we want to lock
+ * @param lockProps Properties object containing properties needed by class
+ */
+ public TargetLock (String resourceId, UUID uuid, String owner, DistributedLockingProperties lockProps) {
+ this.resourceId = resourceId;
+ this.uuid = uuid;
+ this.owner = owner;
+ this.lockProps = lockProps;
+ }
+
+ /**
+ * obtain a lock
+ */
+ public boolean lock() {
+
+ return grabLock();
+ }
+
+ /**
+ * Unlock a resource by deleting it's associated record in the db
+ */
+ public boolean unlock() {
+ return deleteLock();
+ }
+
+ /**
+ * "Grabs" lock by attempting to insert a new record in the db.
+ * If the insert fails due to duplicate key error resource is already locked
+ * so we call secondGrab.
+ */
+ private boolean grabLock() {
+
+ try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(),
+ lockProps.getDbPwd());
+
+ // try to insert a record into the table(thereby grabbing the lock)
+ PreparedStatement statement = conn
+ .prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) values (?, ?, ?, ?)");) {
+ statement.setString(1, this.resourceId);
+ statement.setString(2, this.uuid.toString());
+ statement.setString(3, this.owner);
+ statement.setLong(4, System.currentTimeMillis() + lockProps.getAgingProperty());
+
+ statement.executeUpdate();
+ } catch (SQLException e) {
+ logger.error("error in TargetLock.grabLock()", e);
+ return secondGrab();
+ }
+
+ return true;
+ }
+
+ /**
+ * A second attempt at grabbing a lock. It first attempts to update the lock in case it is expired.
+ * If that fails, it attempts to insert a new record again
+ */
+ private boolean secondGrab() {
+
+ try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(),
+ lockProps.getDbPwd());
+
+ PreparedStatement updateStatement = conn.prepareStatement("UPDATE pooling.locks SET host = ?, owner = ?, expirationTime = ? WHERE expirationTime <= ? AND resourceId = ?");
+
+ PreparedStatement insertStatement = conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) values (?, ?, ?, ?)");) {
+
+ updateStatement.setString(1, this.uuid.toString());
+ updateStatement.setString(2, this.owner);
+ updateStatement.setLong(3, System.currentTimeMillis() + lockProps.getAgingProperty());
+ updateStatement.setLong(4, System.currentTimeMillis());
+ updateStatement.setString(5, this.resourceId);
+
+ // The lock was expired and we grabbed it.
+ // return true
+ if (updateStatement.executeUpdate() == 1) {
+ return true;
+ }
+ // If our update does not return 1 row, the lock either has not expired
+ // or it was removed. Try one last grab
+ else {
+ insertStatement.setString(1, this.resourceId);
+ insertStatement.setString(2, this.uuid.toString());
+ insertStatement.setString(3, this.owner);
+ insertStatement.setLong(4, System.currentTimeMillis() + lockProps.getAgingProperty());
+
+ // If our insert returns 1 we successfully grabbed the lock
+ return (insertStatement.executeUpdate() == 1);
+ }
+
+ } catch (SQLException e) {
+ logger.error("error in TargetLock.secondGrab()", e);
+ return false;
+ }
+
+ }
+
+ /**
+ *To remove a lock we simply delete the record from the db
+ */
+ private boolean deleteLock() {
+
+ try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(),
+ lockProps.getDbPwd());
+
+ PreparedStatement deleteStatement = conn
+ .prepareStatement("DELETE FROM pooling.locks WHERE resourceId = ? AND owner = ? AND host = ?");) {
+
+ deleteStatement.setString(1, this.resourceId);
+ deleteStatement.setString(2, this.owner);
+ deleteStatement.setString(3, this.uuid.toString());
+
+ return (deleteStatement.executeUpdate() == 1);
+
+ } catch (SQLException e) {
+ logger.error("error in TargetLock.deleteLock()", e);
+ return false;
+ }
+
+ }
+
+ /**
+ * Is the lock active
+ */
+ public boolean isActive() {
+
+ try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(),
+ lockProps.getDbPwd());
+
+ PreparedStatement selectStatement = conn
+ .prepareStatement("SELECT * FROM pooling.locks WHERE resourceId = ? AND host = ? AND owner= ? AND expirationTime >= ?");) {
+ {
+ selectStatement.setString(1, this.resourceId);
+ selectStatement.setString(2, this.uuid.toString());
+ selectStatement.setString(3, this.owner);
+ selectStatement.setLong(4, System.currentTimeMillis());
+
+ ResultSet result = selectStatement.executeQuery();
+
+ // This will return true if the
+ // query returned at least one row
+ return result.first();
+
+ }
+ } catch (SQLException e) {
+ logger.error("error in TargetLock.isActive()", e);
+ return false;
+ }
+ }
+
+ /**
+ * Is the resource locked
+ */
+ public boolean isLocked() {
+
+ try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(),
+ lockProps.getDbPwd());
+
+ PreparedStatement selectStatement = conn
+ .prepareStatement("SELECT * FROM pooling.locks WHERE resourceId = ? AND expirationTime >= ?");) {
+ {
+ selectStatement.setString(1, this.resourceId);
+ selectStatement.setLong(2, System.currentTimeMillis());
+ ResultSet result = selectStatement.executeQuery();
+
+ // This will return true if the
+ // query returned at least one row
+ return result.first();
+
+ }
+ } catch (SQLException e) {
+ logger.error("error in TargetLock.isActive()", e);
+ return false;
+ }
+ }
+
+}
diff --git a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureAPI b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureAPI
new file mode 100644
index 00000000..19bdf505
--- /dev/null
+++ b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureAPI
@@ -0,0 +1 @@
+org.onap.policy.distributed.locking.DistributedLockingFeature \ No newline at end of file
diff --git a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI
new file mode 100644
index 00000000..19bdf505
--- /dev/null
+++ b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI
@@ -0,0 +1 @@
+org.onap.policy.distributed.locking.DistributedLockingFeature \ No newline at end of file
diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/test/DistributedLockingFeatureExceptionTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/test/DistributedLockingFeatureExceptionTest.java
new file mode 100644
index 00000000..ea53e522
--- /dev/null
+++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/test/DistributedLockingFeatureExceptionTest.java
@@ -0,0 +1,36 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking.test;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.onap.policy.common.utils.test.ExceptionsTester;
+import org.onap.policy.distributed.locking.DistributedLockingFeatureException;
+
+public class DistributedLockingFeatureExceptionTest extends ExceptionsTester{
+
+ @Test
+ public void test() {
+ assertEquals(1, test(DistributedLockingFeatureException.class));
+ }
+
+}
diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/test/TargetLockTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/test/TargetLockTest.java
new file mode 100644
index 00000000..e624afb9
--- /dev/null
+++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/test/TargetLockTest.java
@@ -0,0 +1,220 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking.test;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.policy.distributed.locking.DistributedLockingFeature;
+import org.onap.policy.drools.persistence.SystemPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public class TargetLockTest {
+ private static final Logger logger = LoggerFactory.getLogger(TargetLockTest.class);
+ private static final String DB_CONNECTION = "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
+ private static final String DB_USER = "user";
+ private static final String DB_PASSWORD = "password";
+ private static Connection conn = null;
+ private static DistributedLockingFeature distLockFeat;
+
+ @BeforeClass
+ public static void setup() {
+ getDBConnection();
+ createTable();
+ SystemPersistence.manager.setConfigurationDir("src/test/resources");
+ distLockFeat = new DistributedLockingFeature();
+ distLockFeat.afterStart(null);
+
+ }
+
+ @AfterClass
+ public static void cleanUp() {
+ distLockFeat.beforeShutdown(null);
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ logger.error("Error in TargetLockTest.cleanUp()", e);
+ }
+ }
+
+ @Before
+ public void wipeDb() {
+
+ try (PreparedStatement lockDelete = conn.prepareStatement("DELETE FROM pooling.locks");){
+ lockDelete.executeUpdate();
+ } catch (SQLException e) {
+ logger.error("Error in TargetLockTest.wipeDb()", e);
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ @Test
+ public void testGrabLockSuccess() throws InterruptedException, ExecutionException {
+ assertTrue(distLockFeat.beforeLock("resource1", "owner1", null).get());
+
+ //attempt to grab expiredLock
+ try (PreparedStatement updateStatement = conn.prepareStatement("UPDATE pooling.locks SET expirationTime = ? WHERE resourceId = ?");)
+ {
+ updateStatement.setLong(1, System.currentTimeMillis() - 1000);
+ updateStatement.setString(2, "resource1");
+ updateStatement.executeUpdate();
+
+ } catch (SQLException e) {
+ logger.error("Error in TargetLockTest.testGrabLockSuccess()", e);
+ throw new RuntimeException(e);
+ }
+
+ assertTrue(distLockFeat.beforeLock("resource1", "owner1", null).get());
+ }
+
+ @Test
+ public void testExpiredLocks() throws InterruptedException, ExecutionException {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ distLockFeat.beforeLock("resource1", "owner1", null);
+
+ try {
+ latch.await(1000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ logger.error("Error in testExpiredLocks", e);
+ }
+
+ //Heartbeat should keep it active
+ assertFalse(distLockFeat.beforeLock("resource1", "owner1", null).get());
+ }
+
+ @Test
+ public void testGrabLockFail() throws InterruptedException, ExecutionException {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ distLockFeat.beforeLock("resource1", "owner1", null);
+
+ try {
+ latch.await(10, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ logger.error("Error in testExpiredLocks", e);
+ }
+ assertFalse(distLockFeat.beforeLock("resource1", "owner1", null).get());
+
+ }
+
+
+ @Test
+ public void testUnlock() throws InterruptedException, ExecutionException {
+ distLockFeat.beforeLock("resource1", "owner1", null);
+
+ assertTrue(distLockFeat.beforeUnlock("resource1", "owner1"));
+ assertTrue(distLockFeat.beforeLock("resource1", "owner1", null).get());
+ }
+
+ @Test
+ public void testIsActive() {
+ assertFalse(distLockFeat.beforeIsLockedBy("resource1", "owner1"));
+ distLockFeat.beforeLock("resource1", "owner1", null);
+ assertTrue(distLockFeat.beforeIsLockedBy("resource1", "owner1"));
+ assertFalse(distLockFeat.beforeIsLockedBy("resource1", "owner2"));
+
+ // isActive on expiredLock
+ try (PreparedStatement updateStatement = conn
+ .prepareStatement("UPDATE pooling.locks SET expirationTime = ? WHERE resourceId = ?");) {
+ updateStatement.setLong(1, System.currentTimeMillis() - 5000);
+ updateStatement.setString(2, "resource1");
+ updateStatement.executeUpdate();
+
+ } catch (SQLException e) {
+ logger.error("Error in TargetLockTest.testIsActive()", e);
+ throw new RuntimeException(e);
+ }
+
+ assertFalse(distLockFeat.beforeIsLockedBy("resource1", "owner1"));
+
+ distLockFeat.beforeLock("resource1", "owner1", null);
+ //Unlock record, next isActive attempt should fail
+ distLockFeat.beforeUnlock("resource1", "owner1");
+ assertFalse(distLockFeat.beforeIsLockedBy("resource1", "owner1"));
+
+ }
+
+ @Test
+ public void testHeartbeat() {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ distLockFeat.beforeLock("resource1", "owner1", null);
+ try {
+ latch.await(1000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ logger.error("Error in testExpiredLocks", e);
+ }
+
+ // This test always returns true.
+ assertTrue(distLockFeat.beforeIsLocked("resource1"));
+ }
+
+ @Test
+ public void unlockBeforeLock() {
+ assertFalse(distLockFeat.beforeUnlock("resource1", "owner1"));
+ distLockFeat.beforeLock("resource1", "owner1", null);
+ assertTrue(distLockFeat.beforeUnlock("resource1", "owner1"));
+ assertFalse(distLockFeat.beforeUnlock("resource1", "owner1"));
+ }
+
+ @Test
+ public void testIsLocked() {
+ assertFalse(distLockFeat.beforeIsLocked("resource1"));
+ distLockFeat.beforeLock("resource1", "owner1", null);
+ assertTrue(distLockFeat.beforeIsLocked("resource1"));
+
+ }
+
+ private static void getDBConnection() {
+ try {
+ conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
+ } catch (SQLException e) {
+ logger.error("Error in TargetLockTest.getDBConnection()", e);
+ }
+ }
+
+ private static void createTable() {
+ String createString = "create table if not exists pooling.locks (resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), expirationTime BIGINT, PRIMARY KEY (resourceId))";
+ try (PreparedStatement createStmt = conn.prepareStatement(createString);) {
+ createStmt.executeUpdate();
+
+ } catch (SQLException e) {
+ logger.error("Error in TargetLockTest.createTable()", e);
+ }
+ }
+
+
+}
diff --git a/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties b/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties
new file mode 100644
index 00000000..d1a07e82
--- /dev/null
+++ b/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties
@@ -0,0 +1,26 @@
+###
+# ============LICENSE_START=======================================================
+ # feature-distributed-locking
+# ================================================================================
+# Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+
+javax.persistence.jdbc.driver=org.h2.Driver
+javax.persistence.jdbc.url=jdbc:h2:mem:pooling
+javax.persistence.jdbc.user=user
+javax.persistence.jdbc.password=password
+distributed.locking.lock.aging=150
+distributed.locking.heartbeat.interval=500 \ No newline at end of file