diff options
author | Jorge Hernandez <jh1730@att.com> | 2018-04-03 21:31:12 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2018-04-03 21:31:12 +0000 |
commit | 06018cabcdcdc96bbf1a186b1de9329f824612ce (patch) | |
tree | 327868a5f815b48e9403f840afb6c6e142a4b201 /feature-distributed-locking/src/main/java | |
parent | 7c98e9950cb20c986ff7a319baad808f15262864 (diff) | |
parent | fff9b57f7411deb798431bd625944fcfdbe053ac (diff) |
Merge "Implementation of distributed locking feature"
Diffstat (limited to 'feature-distributed-locking/src/main/java')
5 files changed, 630 insertions, 0 deletions
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java new file mode 100644 index 00000000..cc7a7a12 --- /dev/null +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java @@ -0,0 +1,158 @@ +/* + * ============LICENSE_START======================================================= + * feature-distributed-locking + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.policy.distributed.locking; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.onap.policy.common.utils.properties.exception.PropertyException; +import org.onap.policy.drools.core.lock.LockRequestFuture; +import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureAPI; +import org.onap.policy.drools.features.PolicyEngineFeatureAPI; +import org.onap.policy.drools.persistence.SystemPersistence; +import org.onap.policy.drools.system.PolicyEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DistributedLockingFeature implements PolicyEngineFeatureAPI, PolicyResourceLockFeatureAPI { + + /** + * Logger instance + */ + private static final Logger logger = LoggerFactory.getLogger(DistributedLockingFeature.class); + + /** + * Properties Configuration Name + */ + public static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking"; + + /** + * Properties for locking feature + */ + private DistributedLockingProperties lockProps; + + /** + *ScheduledExecutorService for LockHeartbeat + */ + private ScheduledExecutorService scheduledExecutorService; + + /** + * UUID + */ + private static final UUID uuid = UUID.randomUUID(); + + /** + * Config directory + */ + @Override + public int getSequenceNumber() { + return 1000; + } + + @Override + public Future<Boolean> beforeLock(String resourceId, String owner, Callback callback) { + + TargetLock tLock = new TargetLock(resourceId, this.uuid, owner, lockProps); + + return new LockRequestFuture(resourceId, owner, tLock.lock()); + + } + + @Override + public Boolean beforeUnlock(String resourceId, String owner) { + TargetLock tLock = new TargetLock(resourceId, this.uuid, owner, lockProps); + + return tLock.unlock(); + } + + @Override + public Boolean beforeIsLockedBy(String resourceId, String owner) { + TargetLock tLock = new TargetLock(resourceId, this.uuid, owner, lockProps); + + return tLock.isActive(); + } + + @Override + public Boolean beforeIsLocked(String resourceId) { + TargetLock tLock = new TargetLock(resourceId, this.uuid, "dummyOwner", lockProps); + + return tLock.isLocked(); + } + + @Override + public boolean afterStart(PolicyEngine engine) { + + try { + this.lockProps = new DistributedLockingProperties(SystemPersistence.manager.getProperties(DistributedLockingFeature.CONFIGURATION_PROPERTIES_NAME)); + } catch (PropertyException e) { + logger.error("DistributedLockingFeature feature properies have not been loaded", e); + throw new DistributedLockingFeatureException(e); + } + + long heartbeatInterval = this.lockProps.getHeartBeatIntervalProperty(); + + cleanLockTable(); + Heartbeat heartbeat = new Heartbeat(this.uuid, lockProps); + + this.scheduledExecutorService = Executors.newScheduledThreadPool(1); + this.scheduledExecutorService.scheduleAtFixedRate(heartbeat, heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS); + return false; + } + + /** + * This method kills the heartbeat thread and calls refreshLockTable which removes + * any records from the db where the current host is the owner. + */ + @Override + public boolean beforeShutdown(PolicyEngine engine) { + scheduledExecutorService.shutdown(); + cleanLockTable(); + return false; + } + + /** + * This method removes all records owned by the current host from the db. + */ + private void cleanLockTable() { + + try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), + lockProps.getDbUser(), + lockProps.getDbPwd()); + PreparedStatement statement = conn.prepareStatement("DELETE FROM pooling.locks WHERE host = ? OR expirationTime < ?"); + ){ + + statement.setString(1, this.uuid.toString()); + statement.setLong(2, System.currentTimeMillis()); + statement.executeUpdate(); + + } catch (SQLException e) { + logger.error("error in refreshLockTable()", e); + } + + } + +} diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java new file mode 100644 index 00000000..f28ccbc9 --- /dev/null +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * feature-distributed-locking + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.distributed.locking; + +public class DistributedLockingFeatureException extends RuntimeException { + private static final long serialVersionUID = 1L; + + /** + * + * @param e + * exception to be wrapped + */ + public DistributedLockingFeatureException(Exception e) { + super(e); + } +} diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java new file mode 100644 index 00000000..139bfb7b --- /dev/null +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java @@ -0,0 +1,127 @@ +/* + * ============LICENSE_START======================================================= + * feature-distributed-locking + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.policy.distributed.locking; + +import java.util.Properties; + +import org.onap.policy.common.utils.properties.PropertyConfiguration; +import org.onap.policy.common.utils.properties.exception.PropertyException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class DistributedLockingProperties extends PropertyConfiguration{ + + private static final Logger logger = LoggerFactory.getLogger(DistributedLockingProperties.class); + + /** + * Feature properties all begin with this prefix. + */ + public static final String PREFIX = "distributed.locking."; + + public static final String DB_DRIVER = "javax.persistence.jdbc.driver"; + public static final String DB_URL = "javax.persistence.jdbc.url"; + public static final String DB_USER = "javax.persistence.jdbc.user"; + public static final String DB_PWD = "javax.persistence.jdbc.password"; + public static final String AGING_PROPERTY = PREFIX + "lock.aging"; + public static final String HEARTBEAT_INTERVAL_PROPERTY = PREFIX + "heartbeat.interval"; + + /** + * Properties from which this was constructed. + */ + private Properties source; + + /** + * Database driver + */ + @Property(name = DB_DRIVER) + private String dbDriver; + + /** + * Database url + */ + @Property(name = DB_URL) + private String dbUrl; + + /** + * Database user + */ + @Property(name = DB_USER) + private String dbUser; + + /** + * Database password + */ + @Property(name = DB_PWD) + private String dbPwd; + + /** + * Used to set expiration time for lock. + */ + @Property(name = AGING_PROPERTY, defaultValue = "300000") + private long agingProperty; + + /** + * Indicates intervals at which we refresh locks. + */ + @Property(name = HEARTBEAT_INTERVAL_PROPERTY, defaultValue = "60000") + private long heartBeatIntervalProperty; + + public DistributedLockingProperties(Properties props) throws PropertyException { + super(props); + source = props; + } + + + public Properties getSource() { + return source; + } + + + public String getDbDriver() { + return dbDriver; + } + + + public String getDbUrl() { + return dbUrl; + } + + + public String getDbUser() { + return dbUser; + } + + + public String getDbPwd() { + return dbPwd; + } + + + public long getAgingProperty() { + return agingProperty; + } + + + public long getHeartBeatIntervalProperty() { + return heartBeatIntervalProperty; + } + +} diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java new file mode 100644 index 00000000..c753dba9 --- /dev/null +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java @@ -0,0 +1,78 @@ +/* + * ============LICENSE_START======================================================= + * feature-distributed-locking + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.distributed.locking; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Properties; +import java.util.UUID; + +import org.onap.policy.drools.utils.NetworkUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * This runnable class scans the locks table for all locks owned by this host. + * It refreshes the expiration time of each lock using the locking.distributed.aging + * property + * + */ +public class Heartbeat implements Runnable{ + + private static final Logger logger = LoggerFactory.getLogger(Heartbeat.class); + + /** + * Properties object containing properties needed by class + */ + private DistributedLockingProperties lockProps; + + /** + * UUID + */ + private UUID uuid; + + public Heartbeat(UUID uuid, DistributedLockingProperties lockProps) { + this.lockProps = lockProps; + this.uuid = uuid; + } + + @Override + public void run() { + + long expirationAge = lockProps.getAgingProperty(); + + try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(), + lockProps.getDbPwd()); + PreparedStatement statement = conn + .prepareStatement("UPDATE pooling.locks SET expirationTime = ? WHERE host = ?");) { + + statement.setLong(1, System.currentTimeMillis() + expirationAge); + statement.setString(2, this.uuid.toString()); + statement.executeUpdate(); + } catch (SQLException e) { + logger.error("error in Heartbeat.run()", e); + } + + } +} diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java new file mode 100644 index 00000000..ceaa849f --- /dev/null +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java @@ -0,0 +1,233 @@ +/* + * ============LICENSE_START======================================================= + * feature-distributed-locking + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.policy.distributed.locking; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TargetLock { + + private static final Logger logger = LoggerFactory.getLogger(TargetLock.class); + + /** + * The Target resource we want to lock + */ + private String resourceId; + + /** + * Properties object containing properties needed by class + */ + private DistributedLockingProperties lockProps; + + /** + * UUID + */ + private UUID uuid; + + /** + * Owner + */ + private String owner; + + /** + * Constructs a TargetLock object. + * + * @param resourceId ID of the entity we want to lock + * @param lockProps Properties object containing properties needed by class + */ + public TargetLock (String resourceId, UUID uuid, String owner, DistributedLockingProperties lockProps) { + this.resourceId = resourceId; + this.uuid = uuid; + this.owner = owner; + this.lockProps = lockProps; + } + + /** + * obtain a lock + */ + public boolean lock() { + + return grabLock(); + } + + /** + * Unlock a resource by deleting it's associated record in the db + */ + public boolean unlock() { + return deleteLock(); + } + + /** + * "Grabs" lock by attempting to insert a new record in the db. + * If the insert fails due to duplicate key error resource is already locked + * so we call secondGrab. + */ + private boolean grabLock() { + + try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(), + lockProps.getDbPwd()); + + // try to insert a record into the table(thereby grabbing the lock) + PreparedStatement statement = conn + .prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) values (?, ?, ?, ?)");) { + statement.setString(1, this.resourceId); + statement.setString(2, this.uuid.toString()); + statement.setString(3, this.owner); + statement.setLong(4, System.currentTimeMillis() + lockProps.getAgingProperty()); + + statement.executeUpdate(); + } catch (SQLException e) { + logger.error("error in TargetLock.grabLock()", e); + return secondGrab(); + } + + return true; + } + + /** + * A second attempt at grabbing a lock. It first attempts to update the lock in case it is expired. + * If that fails, it attempts to insert a new record again + */ + private boolean secondGrab() { + + try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(), + lockProps.getDbPwd()); + + PreparedStatement updateStatement = conn.prepareStatement("UPDATE pooling.locks SET host = ?, owner = ?, expirationTime = ? WHERE expirationTime <= ? AND resourceId = ?"); + + PreparedStatement insertStatement = conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) values (?, ?, ?, ?)");) { + + updateStatement.setString(1, this.uuid.toString()); + updateStatement.setString(2, this.owner); + updateStatement.setLong(3, System.currentTimeMillis() + lockProps.getAgingProperty()); + updateStatement.setLong(4, System.currentTimeMillis()); + updateStatement.setString(5, this.resourceId); + + // The lock was expired and we grabbed it. + // return true + if (updateStatement.executeUpdate() == 1) { + return true; + } + // If our update does not return 1 row, the lock either has not expired + // or it was removed. Try one last grab + else { + insertStatement.setString(1, this.resourceId); + insertStatement.setString(2, this.uuid.toString()); + insertStatement.setString(3, this.owner); + insertStatement.setLong(4, System.currentTimeMillis() + lockProps.getAgingProperty()); + + // If our insert returns 1 we successfully grabbed the lock + return (insertStatement.executeUpdate() == 1); + } + + } catch (SQLException e) { + logger.error("error in TargetLock.secondGrab()", e); + return false; + } + + } + + /** + *To remove a lock we simply delete the record from the db + */ + private boolean deleteLock() { + + try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(), + lockProps.getDbPwd()); + + PreparedStatement deleteStatement = conn + .prepareStatement("DELETE FROM pooling.locks WHERE resourceId = ? AND owner = ? AND host = ?");) { + + deleteStatement.setString(1, this.resourceId); + deleteStatement.setString(2, this.owner); + deleteStatement.setString(3, this.uuid.toString()); + + return (deleteStatement.executeUpdate() == 1); + + } catch (SQLException e) { + logger.error("error in TargetLock.deleteLock()", e); + return false; + } + + } + + /** + * Is the lock active + */ + public boolean isActive() { + + try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(), + lockProps.getDbPwd()); + + PreparedStatement selectStatement = conn + .prepareStatement("SELECT * FROM pooling.locks WHERE resourceId = ? AND host = ? AND owner= ? AND expirationTime >= ?");) { + { + selectStatement.setString(1, this.resourceId); + selectStatement.setString(2, this.uuid.toString()); + selectStatement.setString(3, this.owner); + selectStatement.setLong(4, System.currentTimeMillis()); + + ResultSet result = selectStatement.executeQuery(); + + // This will return true if the + // query returned at least one row + return result.first(); + + } + } catch (SQLException e) { + logger.error("error in TargetLock.isActive()", e); + return false; + } + } + + /** + * Is the resource locked + */ + public boolean isLocked() { + + try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(), + lockProps.getDbPwd()); + + PreparedStatement selectStatement = conn + .prepareStatement("SELECT * FROM pooling.locks WHERE resourceId = ? AND expirationTime >= ?");) { + { + selectStatement.setString(1, this.resourceId); + selectStatement.setLong(2, System.currentTimeMillis()); + ResultSet result = selectStatement.executeQuery(); + + // This will return true if the + // query returned at least one row + return result.first(); + + } + } catch (SQLException e) { + logger.error("error in TargetLock.isActive()", e); + return false; + } + } + +} |