summaryrefslogtreecommitdiffstats
path: root/feature-distributed-locking/src/main/java
diff options
context:
space:
mode:
authorJorge Hernandez <jh1730@att.com>2018-04-03 21:31:12 +0000
committerGerrit Code Review <gerrit@onap.org>2018-04-03 21:31:12 +0000
commit06018cabcdcdc96bbf1a186b1de9329f824612ce (patch)
tree327868a5f815b48e9403f840afb6c6e142a4b201 /feature-distributed-locking/src/main/java
parent7c98e9950cb20c986ff7a319baad808f15262864 (diff)
parentfff9b57f7411deb798431bd625944fcfdbe053ac (diff)
Merge "Implementation of distributed locking feature"
Diffstat (limited to 'feature-distributed-locking/src/main/java')
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java158
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java34
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java127
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java78
-rw-r--r--feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java233
5 files changed, 630 insertions, 0 deletions
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java
new file mode 100644
index 00000000..cc7a7a12
--- /dev/null
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java
@@ -0,0 +1,158 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.policy.distributed.locking;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+import org.onap.policy.drools.core.lock.LockRequestFuture;
+import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureAPI;
+import org.onap.policy.drools.features.PolicyEngineFeatureAPI;
+import org.onap.policy.drools.persistence.SystemPersistence;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DistributedLockingFeature implements PolicyEngineFeatureAPI, PolicyResourceLockFeatureAPI {
+
+ /**
+ * Logger instance
+ */
+ private static final Logger logger = LoggerFactory.getLogger(DistributedLockingFeature.class);
+
+ /**
+ * Properties Configuration Name
+ */
+ public static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
+
+ /**
+ * Properties for locking feature
+ */
+ private DistributedLockingProperties lockProps;
+
+ /**
+ *ScheduledExecutorService for LockHeartbeat
+ */
+ private ScheduledExecutorService scheduledExecutorService;
+
+ /**
+ * UUID
+ */
+ private static final UUID uuid = UUID.randomUUID();
+
+ /**
+ * Config directory
+ */
+ @Override
+ public int getSequenceNumber() {
+ return 1000;
+ }
+
+ @Override
+ public Future<Boolean> beforeLock(String resourceId, String owner, Callback callback) {
+
+ TargetLock tLock = new TargetLock(resourceId, this.uuid, owner, lockProps);
+
+ return new LockRequestFuture(resourceId, owner, tLock.lock());
+
+ }
+
+ @Override
+ public Boolean beforeUnlock(String resourceId, String owner) {
+ TargetLock tLock = new TargetLock(resourceId, this.uuid, owner, lockProps);
+
+ return tLock.unlock();
+ }
+
+ @Override
+ public Boolean beforeIsLockedBy(String resourceId, String owner) {
+ TargetLock tLock = new TargetLock(resourceId, this.uuid, owner, lockProps);
+
+ return tLock.isActive();
+ }
+
+ @Override
+ public Boolean beforeIsLocked(String resourceId) {
+ TargetLock tLock = new TargetLock(resourceId, this.uuid, "dummyOwner", lockProps);
+
+ return tLock.isLocked();
+ }
+
+ @Override
+ public boolean afterStart(PolicyEngine engine) {
+
+ try {
+ this.lockProps = new DistributedLockingProperties(SystemPersistence.manager.getProperties(DistributedLockingFeature.CONFIGURATION_PROPERTIES_NAME));
+ } catch (PropertyException e) {
+ logger.error("DistributedLockingFeature feature properies have not been loaded", e);
+ throw new DistributedLockingFeatureException(e);
+ }
+
+ long heartbeatInterval = this.lockProps.getHeartBeatIntervalProperty();
+
+ cleanLockTable();
+ Heartbeat heartbeat = new Heartbeat(this.uuid, lockProps);
+
+ this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
+ this.scheduledExecutorService.scheduleAtFixedRate(heartbeat, heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS);
+ return false;
+ }
+
+ /**
+ * This method kills the heartbeat thread and calls refreshLockTable which removes
+ * any records from the db where the current host is the owner.
+ */
+ @Override
+ public boolean beforeShutdown(PolicyEngine engine) {
+ scheduledExecutorService.shutdown();
+ cleanLockTable();
+ return false;
+ }
+
+ /**
+ * This method removes all records owned by the current host from the db.
+ */
+ private void cleanLockTable() {
+
+ try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(),
+ lockProps.getDbUser(),
+ lockProps.getDbPwd());
+ PreparedStatement statement = conn.prepareStatement("DELETE FROM pooling.locks WHERE host = ? OR expirationTime < ?");
+ ){
+
+ statement.setString(1, this.uuid.toString());
+ statement.setLong(2, System.currentTimeMillis());
+ statement.executeUpdate();
+
+ } catch (SQLException e) {
+ logger.error("error in refreshLockTable()", e);
+ }
+
+ }
+
+}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java
new file mode 100644
index 00000000..f28ccbc9
--- /dev/null
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking;
+
+public class DistributedLockingFeatureException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ *
+ * @param e
+ * exception to be wrapped
+ */
+ public DistributedLockingFeatureException(Exception e) {
+ super(e);
+ }
+}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java
new file mode 100644
index 00000000..139bfb7b
--- /dev/null
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java
@@ -0,0 +1,127 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.policy.distributed.locking;
+
+import java.util.Properties;
+
+import org.onap.policy.common.utils.properties.PropertyConfiguration;
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DistributedLockingProperties extends PropertyConfiguration{
+
+ private static final Logger logger = LoggerFactory.getLogger(DistributedLockingProperties.class);
+
+ /**
+ * Feature properties all begin with this prefix.
+ */
+ public static final String PREFIX = "distributed.locking.";
+
+ public static final String DB_DRIVER = "javax.persistence.jdbc.driver";
+ public static final String DB_URL = "javax.persistence.jdbc.url";
+ public static final String DB_USER = "javax.persistence.jdbc.user";
+ public static final String DB_PWD = "javax.persistence.jdbc.password";
+ public static final String AGING_PROPERTY = PREFIX + "lock.aging";
+ public static final String HEARTBEAT_INTERVAL_PROPERTY = PREFIX + "heartbeat.interval";
+
+ /**
+ * Properties from which this was constructed.
+ */
+ private Properties source;
+
+ /**
+ * Database driver
+ */
+ @Property(name = DB_DRIVER)
+ private String dbDriver;
+
+ /**
+ * Database url
+ */
+ @Property(name = DB_URL)
+ private String dbUrl;
+
+ /**
+ * Database user
+ */
+ @Property(name = DB_USER)
+ private String dbUser;
+
+ /**
+ * Database password
+ */
+ @Property(name = DB_PWD)
+ private String dbPwd;
+
+ /**
+ * Used to set expiration time for lock.
+ */
+ @Property(name = AGING_PROPERTY, defaultValue = "300000")
+ private long agingProperty;
+
+ /**
+ * Indicates intervals at which we refresh locks.
+ */
+ @Property(name = HEARTBEAT_INTERVAL_PROPERTY, defaultValue = "60000")
+ private long heartBeatIntervalProperty;
+
+ public DistributedLockingProperties(Properties props) throws PropertyException {
+ super(props);
+ source = props;
+ }
+
+
+ public Properties getSource() {
+ return source;
+ }
+
+
+ public String getDbDriver() {
+ return dbDriver;
+ }
+
+
+ public String getDbUrl() {
+ return dbUrl;
+ }
+
+
+ public String getDbUser() {
+ return dbUser;
+ }
+
+
+ public String getDbPwd() {
+ return dbPwd;
+ }
+
+
+ public long getAgingProperty() {
+ return agingProperty;
+ }
+
+
+ public long getHeartBeatIntervalProperty() {
+ return heartBeatIntervalProperty;
+ }
+
+}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java
new file mode 100644
index 00000000..c753dba9
--- /dev/null
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java
@@ -0,0 +1,78 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.onap.policy.drools.utils.NetworkUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * This runnable class scans the locks table for all locks owned by this host.
+ * It refreshes the expiration time of each lock using the locking.distributed.aging
+ * property
+ *
+ */
+public class Heartbeat implements Runnable{
+
+ private static final Logger logger = LoggerFactory.getLogger(Heartbeat.class);
+
+ /**
+ * Properties object containing properties needed by class
+ */
+ private DistributedLockingProperties lockProps;
+
+ /**
+ * UUID
+ */
+ private UUID uuid;
+
+ public Heartbeat(UUID uuid, DistributedLockingProperties lockProps) {
+ this.lockProps = lockProps;
+ this.uuid = uuid;
+ }
+
+ @Override
+ public void run() {
+
+ long expirationAge = lockProps.getAgingProperty();
+
+ try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(),
+ lockProps.getDbPwd());
+ PreparedStatement statement = conn
+ .prepareStatement("UPDATE pooling.locks SET expirationTime = ? WHERE host = ?");) {
+
+ statement.setLong(1, System.currentTimeMillis() + expirationAge);
+ statement.setString(2, this.uuid.toString());
+ statement.executeUpdate();
+ } catch (SQLException e) {
+ logger.error("error in Heartbeat.run()", e);
+ }
+
+ }
+}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java
new file mode 100644
index 00000000..ceaa849f
--- /dev/null
+++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java
@@ -0,0 +1,233 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.policy.distributed.locking;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TargetLock {
+
+ private static final Logger logger = LoggerFactory.getLogger(TargetLock.class);
+
+ /**
+ * The Target resource we want to lock
+ */
+ private String resourceId;
+
+ /**
+ * Properties object containing properties needed by class
+ */
+ private DistributedLockingProperties lockProps;
+
+ /**
+ * UUID
+ */
+ private UUID uuid;
+
+ /**
+ * Owner
+ */
+ private String owner;
+
+ /**
+ * Constructs a TargetLock object.
+ *
+ * @param resourceId ID of the entity we want to lock
+ * @param lockProps Properties object containing properties needed by class
+ */
+ public TargetLock (String resourceId, UUID uuid, String owner, DistributedLockingProperties lockProps) {
+ this.resourceId = resourceId;
+ this.uuid = uuid;
+ this.owner = owner;
+ this.lockProps = lockProps;
+ }
+
+ /**
+ * obtain a lock
+ */
+ public boolean lock() {
+
+ return grabLock();
+ }
+
+ /**
+ * Unlock a resource by deleting it's associated record in the db
+ */
+ public boolean unlock() {
+ return deleteLock();
+ }
+
+ /**
+ * "Grabs" lock by attempting to insert a new record in the db.
+ * If the insert fails due to duplicate key error resource is already locked
+ * so we call secondGrab.
+ */
+ private boolean grabLock() {
+
+ try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(),
+ lockProps.getDbPwd());
+
+ // try to insert a record into the table(thereby grabbing the lock)
+ PreparedStatement statement = conn
+ .prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) values (?, ?, ?, ?)");) {
+ statement.setString(1, this.resourceId);
+ statement.setString(2, this.uuid.toString());
+ statement.setString(3, this.owner);
+ statement.setLong(4, System.currentTimeMillis() + lockProps.getAgingProperty());
+
+ statement.executeUpdate();
+ } catch (SQLException e) {
+ logger.error("error in TargetLock.grabLock()", e);
+ return secondGrab();
+ }
+
+ return true;
+ }
+
+ /**
+ * A second attempt at grabbing a lock. It first attempts to update the lock in case it is expired.
+ * If that fails, it attempts to insert a new record again
+ */
+ private boolean secondGrab() {
+
+ try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(),
+ lockProps.getDbPwd());
+
+ PreparedStatement updateStatement = conn.prepareStatement("UPDATE pooling.locks SET host = ?, owner = ?, expirationTime = ? WHERE expirationTime <= ? AND resourceId = ?");
+
+ PreparedStatement insertStatement = conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) values (?, ?, ?, ?)");) {
+
+ updateStatement.setString(1, this.uuid.toString());
+ updateStatement.setString(2, this.owner);
+ updateStatement.setLong(3, System.currentTimeMillis() + lockProps.getAgingProperty());
+ updateStatement.setLong(4, System.currentTimeMillis());
+ updateStatement.setString(5, this.resourceId);
+
+ // The lock was expired and we grabbed it.
+ // return true
+ if (updateStatement.executeUpdate() == 1) {
+ return true;
+ }
+ // If our update does not return 1 row, the lock either has not expired
+ // or it was removed. Try one last grab
+ else {
+ insertStatement.setString(1, this.resourceId);
+ insertStatement.setString(2, this.uuid.toString());
+ insertStatement.setString(3, this.owner);
+ insertStatement.setLong(4, System.currentTimeMillis() + lockProps.getAgingProperty());
+
+ // If our insert returns 1 we successfully grabbed the lock
+ return (insertStatement.executeUpdate() == 1);
+ }
+
+ } catch (SQLException e) {
+ logger.error("error in TargetLock.secondGrab()", e);
+ return false;
+ }
+
+ }
+
+ /**
+ *To remove a lock we simply delete the record from the db
+ */
+ private boolean deleteLock() {
+
+ try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(),
+ lockProps.getDbPwd());
+
+ PreparedStatement deleteStatement = conn
+ .prepareStatement("DELETE FROM pooling.locks WHERE resourceId = ? AND owner = ? AND host = ?");) {
+
+ deleteStatement.setString(1, this.resourceId);
+ deleteStatement.setString(2, this.owner);
+ deleteStatement.setString(3, this.uuid.toString());
+
+ return (deleteStatement.executeUpdate() == 1);
+
+ } catch (SQLException e) {
+ logger.error("error in TargetLock.deleteLock()", e);
+ return false;
+ }
+
+ }
+
+ /**
+ * Is the lock active
+ */
+ public boolean isActive() {
+
+ try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(),
+ lockProps.getDbPwd());
+
+ PreparedStatement selectStatement = conn
+ .prepareStatement("SELECT * FROM pooling.locks WHERE resourceId = ? AND host = ? AND owner= ? AND expirationTime >= ?");) {
+ {
+ selectStatement.setString(1, this.resourceId);
+ selectStatement.setString(2, this.uuid.toString());
+ selectStatement.setString(3, this.owner);
+ selectStatement.setLong(4, System.currentTimeMillis());
+
+ ResultSet result = selectStatement.executeQuery();
+
+ // This will return true if the
+ // query returned at least one row
+ return result.first();
+
+ }
+ } catch (SQLException e) {
+ logger.error("error in TargetLock.isActive()", e);
+ return false;
+ }
+ }
+
+ /**
+ * Is the resource locked
+ */
+ public boolean isLocked() {
+
+ try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(),
+ lockProps.getDbPwd());
+
+ PreparedStatement selectStatement = conn
+ .prepareStatement("SELECT * FROM pooling.locks WHERE resourceId = ? AND expirationTime >= ?");) {
+ {
+ selectStatement.setString(1, this.resourceId);
+ selectStatement.setLong(2, System.currentTimeMillis());
+ ResultSet result = selectStatement.executeQuery();
+
+ // This will return true if the
+ // query returned at least one row
+ return result.first();
+
+ }
+ } catch (SQLException e) {
+ logger.error("error in TargetLock.isActive()", e);
+ return false;
+ }
+ }
+
+}