diff options
author | Magnusen, Drew (dm741q) <dm741q@att.com> | 2018-03-21 16:44:45 -0500 |
---|---|---|
committer | Magnusen, Drew (dm741q) <dm741q@att.com> | 2018-04-03 14:05:18 -0500 |
commit | fff9b57f7411deb798431bd625944fcfdbe053ac (patch) | |
tree | c1d7b2d23df54a61a15cd0804f7cce3b42c527f7 | |
parent | 54bc3867539264a518c88772e82ea8070ef97c79 (diff) |
Implementation of distributed locking feature
This feature is a very basic implementation of a distributed locking
system.
Issue-ID: POLICY-699
Change-Id: I012fd37926ccbbdd87a3e4acb2788b53680115f0
Signed-off-by: Magnusen, Drew (dm741q) <dm741q@att.com>
18 files changed, 1648 insertions, 0 deletions
diff --git a/feature-distributed-locking/pom.xml b/feature-distributed-locking/pom.xml new file mode 100644 index 00000000..c4beacc2 --- /dev/null +++ b/feature-distributed-locking/pom.xml @@ -0,0 +1,130 @@ +<!-- + ============LICENSE_START======================================================= + ONAP Policy Engine - Drools PDP + ================================================================================ + Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + ================================================================================ + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ============LICENSE_END========================================================= + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.onap.policy.drools-pdp</groupId> + <artifactId>drools-pdp</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + + <artifactId>feature-distributed-locking</artifactId> + + <name>feature-distributed-locking</name> + <description>Loadable module that provides distributed locking capability</description> + + <properties> + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + </properties> + + <build> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>2.6</version> + <executions> + <execution> + <id>zipfile</id> + <goals> + <goal>single</goal> + </goals> + <phase>package</phase> + <configuration> + <attach>true</attach> + <finalName>${project.artifactId}-${project.version}</finalName> + <descriptors> + <descriptor>src/assembly/assemble_zip.xml</descriptor> + </descriptors> + <appendAssemblyId>false</appendAssemblyId> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.8</version> + <executions> + <execution> + <id>copy-dependencies</id> + <goals> + <goal>copy-dependencies</goal> + </goals> + <phase>prepare-package</phase> + <configuration> + <transitive>false</transitive> + <outputDirectory>${project.build.directory}/assembly/lib</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>true</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <useRepositoryLayout>false</useRepositoryLayout> + <addParentPoms>false</addParentPoms> + <copyPom>false</copyPom> + <includeScope>runtime</includeScope> + <excludeTransitive>true</excludeTransitive> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.onap.policy.drools-pdp</groupId> + <artifactId>policy-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.onap.policy.drools-pdp</groupId> + <artifactId>policy-management</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + <version>[1.4.186,)</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.onap.policy.common</groupId> + <artifactId>utils</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.onap.policy.common</groupId> + <artifactId>utils-test</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + +</project> diff --git a/feature-distributed-locking/src/assembly/assemble_zip.xml b/feature-distributed-locking/src/assembly/assemble_zip.xml new file mode 100644 index 00000000..2112fbcd --- /dev/null +++ b/feature-distributed-locking/src/assembly/assemble_zip.xml @@ -0,0 +1,75 @@ +<!-- + ============LICENSE_START======================================================= + feature-distributed-locking + ================================================================================ + Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + ================================================================================ + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ============LICENSE_END========================================================= + --> + +<!-- Defines how we build the .zip file which is our distribution. --> + +<assembly + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> + <id>feature-distributed-locking</id> + <formats> + <format>zip</format> + </formats> + + <includeBaseDirectory>false</includeBaseDirectory> + + <fileSets> + <fileSet> + <directory>target</directory> + <outputDirectory>lib/feature</outputDirectory> + <includes> + <include>feature-distributed-locking-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>target/assembly/lib</directory> + <outputDirectory>lib/dependencies</outputDirectory> + <includes> + <include>*.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>src/main/feature/config</directory> + <outputDirectory>config</outputDirectory> + <fileMode>0644</fileMode> + <excludes/> + </fileSet> + <fileSet> + <directory>src/main/feature/bin</directory> + <outputDirectory>bin</outputDirectory> + <fileMode>0744</fileMode> + <excludes/> + </fileSet> + <fileSet> + <directory>src/main/feature/db</directory> + <outputDirectory>db</outputDirectory> + <fileMode>0744</fileMode> + <excludes/> + </fileSet> + <fileSet> + <directory>src/main/feature/install</directory> + <outputDirectory>install</outputDirectory> + <fileMode>0744</fileMode> + <excludes/> + </fileSet> + </fileSets> + +</assembly> diff --git a/feature-distributed-locking/src/main/feature/config/feature-distributed-locking.properties b/feature-distributed-locking/src/main/feature/config/feature-distributed-locking.properties new file mode 100644 index 00000000..ee4aa474 --- /dev/null +++ b/feature-distributed-locking/src/main/feature/config/feature-distributed-locking.properties @@ -0,0 +1,34 @@ +### +# ============LICENSE_START======================================================= + # feature-distributed-locking +# ================================================================================ +# Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### + +#Database properties +#javax.persistence.jdbc.driver= org.mariadb.jdbc.Driver +#javax.persistence.jdbc.url=jdbc:mariadb://${{SQL_HOST}}:3306/locks +#javax.persistence.jdbc.user=${{SQL_USER}} +#javax.persistence.jdbc.password=${{SQL_PASSWORD}} + +#This value is added to System.currentTimeMs to +#set expirationTime when a lock is obtained. +#distributed.locking.lock.aging=1000 + +#The frequency (in milliseconds) that the heartbeat +#thread refreshes locks owned by the current host +#distributed.locking.heartbeat.interval=5000 + diff --git a/feature-distributed-locking/src/main/feature/db/pooling/sql/1804-distributedlocking.downgrade.sql b/feature-distributed-locking/src/main/feature/db/pooling/sql/1804-distributedlocking.downgrade.sql new file mode 100644 index 00000000..cd1b815d --- /dev/null +++ b/feature-distributed-locking/src/main/feature/db/pooling/sql/1804-distributedlocking.downgrade.sql @@ -0,0 +1,20 @@ +# ============LICENSE_START======================================================= +# feature-distributed-locking +# ================================================================================ +# Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +use pooling; +drop table if exists locks;
\ No newline at end of file diff --git a/feature-distributed-locking/src/main/feature/db/pooling/sql/1804-distributedlocking.upgrade.sql b/feature-distributed-locking/src/main/feature/db/pooling/sql/1804-distributedlocking.upgrade.sql new file mode 100644 index 00000000..be56d35e --- /dev/null +++ b/feature-distributed-locking/src/main/feature/db/pooling/sql/1804-distributedlocking.upgrade.sql @@ -0,0 +1,23 @@ +# ============LICENSE_START======================================================= +# feature-distributed-locking +# ================================================================================ +# Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + + set foreign_key_checks=0; + + CREATE TABLE if not exists pooling.locks (resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), expirationTime BIGINT, PRIMARY KEY (resourceId), INDEX idx_expirationTime(expirationTime), INDEX idx_host(host)); + + set foreign_key_checks=1;
\ No newline at end of file diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java new file mode 100644 index 00000000..cc7a7a12 --- /dev/null +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java @@ -0,0 +1,158 @@ +/* + * ============LICENSE_START======================================================= + * feature-distributed-locking + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.policy.distributed.locking; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.onap.policy.common.utils.properties.exception.PropertyException; +import org.onap.policy.drools.core.lock.LockRequestFuture; +import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureAPI; +import org.onap.policy.drools.features.PolicyEngineFeatureAPI; +import org.onap.policy.drools.persistence.SystemPersistence; +import org.onap.policy.drools.system.PolicyEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DistributedLockingFeature implements PolicyEngineFeatureAPI, PolicyResourceLockFeatureAPI { + + /** + * Logger instance + */ + private static final Logger logger = LoggerFactory.getLogger(DistributedLockingFeature.class); + + /** + * Properties Configuration Name + */ + public static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking"; + + /** + * Properties for locking feature + */ + private DistributedLockingProperties lockProps; + + /** + *ScheduledExecutorService for LockHeartbeat + */ + private ScheduledExecutorService scheduledExecutorService; + + /** + * UUID + */ + private static final UUID uuid = UUID.randomUUID(); + + /** + * Config directory + */ + @Override + public int getSequenceNumber() { + return 1000; + } + + @Override + public Future<Boolean> beforeLock(String resourceId, String owner, Callback callback) { + + TargetLock tLock = new TargetLock(resourceId, this.uuid, owner, lockProps); + + return new LockRequestFuture(resourceId, owner, tLock.lock()); + + } + + @Override + public Boolean beforeUnlock(String resourceId, String owner) { + TargetLock tLock = new TargetLock(resourceId, this.uuid, owner, lockProps); + + return tLock.unlock(); + } + + @Override + public Boolean beforeIsLockedBy(String resourceId, String owner) { + TargetLock tLock = new TargetLock(resourceId, this.uuid, owner, lockProps); + + return tLock.isActive(); + } + + @Override + public Boolean beforeIsLocked(String resourceId) { + TargetLock tLock = new TargetLock(resourceId, this.uuid, "dummyOwner", lockProps); + + return tLock.isLocked(); + } + + @Override + public boolean afterStart(PolicyEngine engine) { + + try { + this.lockProps = new DistributedLockingProperties(SystemPersistence.manager.getProperties(DistributedLockingFeature.CONFIGURATION_PROPERTIES_NAME)); + } catch (PropertyException e) { + logger.error("DistributedLockingFeature feature properies have not been loaded", e); + throw new DistributedLockingFeatureException(e); + } + + long heartbeatInterval = this.lockProps.getHeartBeatIntervalProperty(); + + cleanLockTable(); + Heartbeat heartbeat = new Heartbeat(this.uuid, lockProps); + + this.scheduledExecutorService = Executors.newScheduledThreadPool(1); + this.scheduledExecutorService.scheduleAtFixedRate(heartbeat, heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS); + return false; + } + + /** + * This method kills the heartbeat thread and calls refreshLockTable which removes + * any records from the db where the current host is the owner. + */ + @Override + public boolean beforeShutdown(PolicyEngine engine) { + scheduledExecutorService.shutdown(); + cleanLockTable(); + return false; + } + + /** + * This method removes all records owned by the current host from the db. + */ + private void cleanLockTable() { + + try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), + lockProps.getDbUser(), + lockProps.getDbPwd()); + PreparedStatement statement = conn.prepareStatement("DELETE FROM pooling.locks WHERE host = ? OR expirationTime < ?"); + ){ + + statement.setString(1, this.uuid.toString()); + statement.setLong(2, System.currentTimeMillis()); + statement.executeUpdate(); + + } catch (SQLException e) { + logger.error("error in refreshLockTable()", e); + } + + } + +} diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java new file mode 100644 index 00000000..f28ccbc9 --- /dev/null +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * feature-distributed-locking + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.distributed.locking; + +public class DistributedLockingFeatureException extends RuntimeException { + private static final long serialVersionUID = 1L; + + /** + * + * @param e + * exception to be wrapped + */ + public DistributedLockingFeatureException(Exception e) { + super(e); + } +} diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java new file mode 100644 index 00000000..139bfb7b --- /dev/null +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java @@ -0,0 +1,127 @@ +/* + * ============LICENSE_START======================================================= + * feature-distributed-locking + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.policy.distributed.locking; + +import java.util.Properties; + +import org.onap.policy.common.utils.properties.PropertyConfiguration; +import org.onap.policy.common.utils.properties.exception.PropertyException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class DistributedLockingProperties extends PropertyConfiguration{ + + private static final Logger logger = LoggerFactory.getLogger(DistributedLockingProperties.class); + + /** + * Feature properties all begin with this prefix. + */ + public static final String PREFIX = "distributed.locking."; + + public static final String DB_DRIVER = "javax.persistence.jdbc.driver"; + public static final String DB_URL = "javax.persistence.jdbc.url"; + public static final String DB_USER = "javax.persistence.jdbc.user"; + public static final String DB_PWD = "javax.persistence.jdbc.password"; + public static final String AGING_PROPERTY = PREFIX + "lock.aging"; + public static final String HEARTBEAT_INTERVAL_PROPERTY = PREFIX + "heartbeat.interval"; + + /** + * Properties from which this was constructed. + */ + private Properties source; + + /** + * Database driver + */ + @Property(name = DB_DRIVER) + private String dbDriver; + + /** + * Database url + */ + @Property(name = DB_URL) + private String dbUrl; + + /** + * Database user + */ + @Property(name = DB_USER) + private String dbUser; + + /** + * Database password + */ + @Property(name = DB_PWD) + private String dbPwd; + + /** + * Used to set expiration time for lock. + */ + @Property(name = AGING_PROPERTY, defaultValue = "300000") + private long agingProperty; + + /** + * Indicates intervals at which we refresh locks. + */ + @Property(name = HEARTBEAT_INTERVAL_PROPERTY, defaultValue = "60000") + private long heartBeatIntervalProperty; + + public DistributedLockingProperties(Properties props) throws PropertyException { + super(props); + source = props; + } + + + public Properties getSource() { + return source; + } + + + public String getDbDriver() { + return dbDriver; + } + + + public String getDbUrl() { + return dbUrl; + } + + + public String getDbUser() { + return dbUser; + } + + + public String getDbPwd() { + return dbPwd; + } + + + public long getAgingProperty() { + return agingProperty; + } + + + public long getHeartBeatIntervalProperty() { + return heartBeatIntervalProperty; + } + +} diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java new file mode 100644 index 00000000..c753dba9 --- /dev/null +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java @@ -0,0 +1,78 @@ +/* + * ============LICENSE_START======================================================= + * feature-distributed-locking + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.distributed.locking; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Properties; +import java.util.UUID; + +import org.onap.policy.drools.utils.NetworkUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * This runnable class scans the locks table for all locks owned by this host. + * It refreshes the expiration time of each lock using the locking.distributed.aging + * property + * + */ +public class Heartbeat implements Runnable{ + + private static final Logger logger = LoggerFactory.getLogger(Heartbeat.class); + + /** + * Properties object containing properties needed by class + */ + private DistributedLockingProperties lockProps; + + /** + * UUID + */ + private UUID uuid; + + public Heartbeat(UUID uuid, DistributedLockingProperties lockProps) { + this.lockProps = lockProps; + this.uuid = uuid; + } + + @Override + public void run() { + + long expirationAge = lockProps.getAgingProperty(); + + try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(), + lockProps.getDbPwd()); + PreparedStatement statement = conn + .prepareStatement("UPDATE pooling.locks SET expirationTime = ? WHERE host = ?");) { + + statement.setLong(1, System.currentTimeMillis() + expirationAge); + statement.setString(2, this.uuid.toString()); + statement.executeUpdate(); + } catch (SQLException e) { + logger.error("error in Heartbeat.run()", e); + } + + } +} diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java new file mode 100644 index 00000000..ceaa849f --- /dev/null +++ b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java @@ -0,0 +1,233 @@ +/* + * ============LICENSE_START======================================================= + * feature-distributed-locking + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.policy.distributed.locking; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TargetLock { + + private static final Logger logger = LoggerFactory.getLogger(TargetLock.class); + + /** + * The Target resource we want to lock + */ + private String resourceId; + + /** + * Properties object containing properties needed by class + */ + private DistributedLockingProperties lockProps; + + /** + * UUID + */ + private UUID uuid; + + /** + * Owner + */ + private String owner; + + /** + * Constructs a TargetLock object. + * + * @param resourceId ID of the entity we want to lock + * @param lockProps Properties object containing properties needed by class + */ + public TargetLock (String resourceId, UUID uuid, String owner, DistributedLockingProperties lockProps) { + this.resourceId = resourceId; + this.uuid = uuid; + this.owner = owner; + this.lockProps = lockProps; + } + + /** + * obtain a lock + */ + public boolean lock() { + + return grabLock(); + } + + /** + * Unlock a resource by deleting it's associated record in the db + */ + public boolean unlock() { + return deleteLock(); + } + + /** + * "Grabs" lock by attempting to insert a new record in the db. + * If the insert fails due to duplicate key error resource is already locked + * so we call secondGrab. + */ + private boolean grabLock() { + + try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(), + lockProps.getDbPwd()); + + // try to insert a record into the table(thereby grabbing the lock) + PreparedStatement statement = conn + .prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) values (?, ?, ?, ?)");) { + statement.setString(1, this.resourceId); + statement.setString(2, this.uuid.toString()); + statement.setString(3, this.owner); + statement.setLong(4, System.currentTimeMillis() + lockProps.getAgingProperty()); + + statement.executeUpdate(); + } catch (SQLException e) { + logger.error("error in TargetLock.grabLock()", e); + return secondGrab(); + } + + return true; + } + + /** + * A second attempt at grabbing a lock. It first attempts to update the lock in case it is expired. + * If that fails, it attempts to insert a new record again + */ + private boolean secondGrab() { + + try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(), + lockProps.getDbPwd()); + + PreparedStatement updateStatement = conn.prepareStatement("UPDATE pooling.locks SET host = ?, owner = ?, expirationTime = ? WHERE expirationTime <= ? AND resourceId = ?"); + + PreparedStatement insertStatement = conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) values (?, ?, ?, ?)");) { + + updateStatement.setString(1, this.uuid.toString()); + updateStatement.setString(2, this.owner); + updateStatement.setLong(3, System.currentTimeMillis() + lockProps.getAgingProperty()); + updateStatement.setLong(4, System.currentTimeMillis()); + updateStatement.setString(5, this.resourceId); + + // The lock was expired and we grabbed it. + // return true + if (updateStatement.executeUpdate() == 1) { + return true; + } + // If our update does not return 1 row, the lock either has not expired + // or it was removed. Try one last grab + else { + insertStatement.setString(1, this.resourceId); + insertStatement.setString(2, this.uuid.toString()); + insertStatement.setString(3, this.owner); + insertStatement.setLong(4, System.currentTimeMillis() + lockProps.getAgingProperty()); + + // If our insert returns 1 we successfully grabbed the lock + return (insertStatement.executeUpdate() == 1); + } + + } catch (SQLException e) { + logger.error("error in TargetLock.secondGrab()", e); + return false; + } + + } + + /** + *To remove a lock we simply delete the record from the db + */ + private boolean deleteLock() { + + try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(), + lockProps.getDbPwd()); + + PreparedStatement deleteStatement = conn + .prepareStatement("DELETE FROM pooling.locks WHERE resourceId = ? AND owner = ? AND host = ?");) { + + deleteStatement.setString(1, this.resourceId); + deleteStatement.setString(2, this.owner); + deleteStatement.setString(3, this.uuid.toString()); + + return (deleteStatement.executeUpdate() == 1); + + } catch (SQLException e) { + logger.error("error in TargetLock.deleteLock()", e); + return false; + } + + } + + /** + * Is the lock active + */ + public boolean isActive() { + + try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(), + lockProps.getDbPwd()); + + PreparedStatement selectStatement = conn + .prepareStatement("SELECT * FROM pooling.locks WHERE resourceId = ? AND host = ? AND owner= ? AND expirationTime >= ?");) { + { + selectStatement.setString(1, this.resourceId); + selectStatement.setString(2, this.uuid.toString()); + selectStatement.setString(3, this.owner); + selectStatement.setLong(4, System.currentTimeMillis()); + + ResultSet result = selectStatement.executeQuery(); + + // This will return true if the + // query returned at least one row + return result.first(); + + } + } catch (SQLException e) { + logger.error("error in TargetLock.isActive()", e); + return false; + } + } + + /** + * Is the resource locked + */ + public boolean isLocked() { + + try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(), + lockProps.getDbPwd()); + + PreparedStatement selectStatement = conn + .prepareStatement("SELECT * FROM pooling.locks WHERE resourceId = ? AND expirationTime >= ?");) { + { + selectStatement.setString(1, this.resourceId); + selectStatement.setLong(2, System.currentTimeMillis()); + ResultSet result = selectStatement.executeQuery(); + + // This will return true if the + // query returned at least one row + return result.first(); + + } + } catch (SQLException e) { + logger.error("error in TargetLock.isActive()", e); + return false; + } + } + +} diff --git a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureAPI b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureAPI new file mode 100644 index 00000000..19bdf505 --- /dev/null +++ b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureAPI @@ -0,0 +1 @@ +org.onap.policy.distributed.locking.DistributedLockingFeature
\ No newline at end of file diff --git a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI new file mode 100644 index 00000000..19bdf505 --- /dev/null +++ b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI @@ -0,0 +1 @@ +org.onap.policy.distributed.locking.DistributedLockingFeature
\ No newline at end of file diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/test/DistributedLockingFeatureExceptionTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/test/DistributedLockingFeatureExceptionTest.java new file mode 100644 index 00000000..ea53e522 --- /dev/null +++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/test/DistributedLockingFeatureExceptionTest.java @@ -0,0 +1,36 @@ +/* + * ============LICENSE_START======================================================= + * feature-distributed-locking + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.distributed.locking.test; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; +import org.onap.policy.common.utils.test.ExceptionsTester; +import org.onap.policy.distributed.locking.DistributedLockingFeatureException; + +public class DistributedLockingFeatureExceptionTest extends ExceptionsTester{ + + @Test + public void test() { + assertEquals(1, test(DistributedLockingFeatureException.class)); + } + +} diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/test/TargetLockTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/test/TargetLockTest.java new file mode 100644 index 00000000..e624afb9 --- /dev/null +++ b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/test/TargetLockTest.java @@ -0,0 +1,220 @@ +/* + * ============LICENSE_START======================================================= + * feature-distributed-locking + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.distributed.locking.test; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.onap.policy.distributed.locking.DistributedLockingFeature; +import org.onap.policy.drools.persistence.SystemPersistence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +public class TargetLockTest { + private static final Logger logger = LoggerFactory.getLogger(TargetLockTest.class); + private static final String DB_CONNECTION = "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling"; + private static final String DB_USER = "user"; + private static final String DB_PASSWORD = "password"; + private static Connection conn = null; + private static DistributedLockingFeature distLockFeat; + + @BeforeClass + public static void setup() { + getDBConnection(); + createTable(); + SystemPersistence.manager.setConfigurationDir("src/test/resources"); + distLockFeat = new DistributedLockingFeature(); + distLockFeat.afterStart(null); + + } + + @AfterClass + public static void cleanUp() { + distLockFeat.beforeShutdown(null); + try { + conn.close(); + } catch (SQLException e) { + logger.error("Error in TargetLockTest.cleanUp()", e); + } + } + + @Before + public void wipeDb() { + + try (PreparedStatement lockDelete = conn.prepareStatement("DELETE FROM pooling.locks");){ + lockDelete.executeUpdate(); + } catch (SQLException e) { + logger.error("Error in TargetLockTest.wipeDb()", e); + throw new RuntimeException(e); + } + + } + + @Test + public void testGrabLockSuccess() throws InterruptedException, ExecutionException { + assertTrue(distLockFeat.beforeLock("resource1", "owner1", null).get()); + + //attempt to grab expiredLock + try (PreparedStatement updateStatement = conn.prepareStatement("UPDATE pooling.locks SET expirationTime = ? WHERE resourceId = ?");) + { + updateStatement.setLong(1, System.currentTimeMillis() - 1000); + updateStatement.setString(2, "resource1"); + updateStatement.executeUpdate(); + + } catch (SQLException e) { + logger.error("Error in TargetLockTest.testGrabLockSuccess()", e); + throw new RuntimeException(e); + } + + assertTrue(distLockFeat.beforeLock("resource1", "owner1", null).get()); + } + + @Test + public void testExpiredLocks() throws InterruptedException, ExecutionException { + CountDownLatch latch = new CountDownLatch(1); + + distLockFeat.beforeLock("resource1", "owner1", null); + + try { + latch.await(1000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + logger.error("Error in testExpiredLocks", e); + } + + //Heartbeat should keep it active + assertFalse(distLockFeat.beforeLock("resource1", "owner1", null).get()); + } + + @Test + public void testGrabLockFail() throws InterruptedException, ExecutionException { + CountDownLatch latch = new CountDownLatch(1); + + distLockFeat.beforeLock("resource1", "owner1", null); + + try { + latch.await(10, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + logger.error("Error in testExpiredLocks", e); + } + assertFalse(distLockFeat.beforeLock("resource1", "owner1", null).get()); + + } + + + @Test + public void testUnlock() throws InterruptedException, ExecutionException { + distLockFeat.beforeLock("resource1", "owner1", null); + + assertTrue(distLockFeat.beforeUnlock("resource1", "owner1")); + assertTrue(distLockFeat.beforeLock("resource1", "owner1", null).get()); + } + + @Test + public void testIsActive() { + assertFalse(distLockFeat.beforeIsLockedBy("resource1", "owner1")); + distLockFeat.beforeLock("resource1", "owner1", null); + assertTrue(distLockFeat.beforeIsLockedBy("resource1", "owner1")); + assertFalse(distLockFeat.beforeIsLockedBy("resource1", "owner2")); + + // isActive on expiredLock + try (PreparedStatement updateStatement = conn + .prepareStatement("UPDATE pooling.locks SET expirationTime = ? WHERE resourceId = ?");) { + updateStatement.setLong(1, System.currentTimeMillis() - 5000); + updateStatement.setString(2, "resource1"); + updateStatement.executeUpdate(); + + } catch (SQLException e) { + logger.error("Error in TargetLockTest.testIsActive()", e); + throw new RuntimeException(e); + } + + assertFalse(distLockFeat.beforeIsLockedBy("resource1", "owner1")); + + distLockFeat.beforeLock("resource1", "owner1", null); + //Unlock record, next isActive attempt should fail + distLockFeat.beforeUnlock("resource1", "owner1"); + assertFalse(distLockFeat.beforeIsLockedBy("resource1", "owner1")); + + } + + @Test + public void testHeartbeat() { + CountDownLatch latch = new CountDownLatch(1); + + distLockFeat.beforeLock("resource1", "owner1", null); + try { + latch.await(1000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + logger.error("Error in testExpiredLocks", e); + } + + // This test always returns true. + assertTrue(distLockFeat.beforeIsLocked("resource1")); + } + + @Test + public void unlockBeforeLock() { + assertFalse(distLockFeat.beforeUnlock("resource1", "owner1")); + distLockFeat.beforeLock("resource1", "owner1", null); + assertTrue(distLockFeat.beforeUnlock("resource1", "owner1")); + assertFalse(distLockFeat.beforeUnlock("resource1", "owner1")); + } + + @Test + public void testIsLocked() { + assertFalse(distLockFeat.beforeIsLocked("resource1")); + distLockFeat.beforeLock("resource1", "owner1", null); + assertTrue(distLockFeat.beforeIsLocked("resource1")); + + } + + private static void getDBConnection() { + try { + conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD); + } catch (SQLException e) { + logger.error("Error in TargetLockTest.getDBConnection()", e); + } + } + + private static void createTable() { + String createString = "create table if not exists pooling.locks (resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), expirationTime BIGINT, PRIMARY KEY (resourceId))"; + try (PreparedStatement createStmt = conn.prepareStatement(createString);) { + createStmt.executeUpdate(); + + } catch (SQLException e) { + logger.error("Error in TargetLockTest.createTable()", e); + } + } + + +} diff --git a/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties b/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties new file mode 100644 index 00000000..d1a07e82 --- /dev/null +++ b/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties @@ -0,0 +1,26 @@ +### +# ============LICENSE_START======================================================= + # feature-distributed-locking +# ================================================================================ +# Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### + +javax.persistence.jdbc.driver=org.h2.Driver +javax.persistence.jdbc.url=jdbc:h2:mem:pooling +javax.persistence.jdbc.user=user +javax.persistence.jdbc.password=password +distributed.locking.lock.aging=150 +distributed.locking.heartbeat.interval=500
\ No newline at end of file diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockRequestFuture.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockRequestFuture.java new file mode 100644 index 00000000..46d1ff2d --- /dev/null +++ b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockRequestFuture.java @@ -0,0 +1,261 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.core.lock; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureAPI.Callback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Future associated with a lock request. + */ +public class LockRequestFuture implements Future<Boolean> { + + // messages used in exceptions + public static final String MSG_NULL_RESOURCE_ID = "null resourceId"; + public static final String MSG_NULL_OWNER = "null owner"; + + private static Logger logger = LoggerFactory.getLogger(LockRequestFuture.class); + + /** + * The resource on which the lock was requested. + */ + private final String resourceId; + + /** + * The owner for which the lock was requested. + */ + private final String owner; + + /** + * Possible states for this future. + */ + private enum State { + WAITING, CANCELLED, ACQUIRED, DENIED + }; + + private AtomicReference<State> state; + + /** + * Used to wait for the lock request to complete. + */ + private CountDownLatch waiter = new CountDownLatch(1); + + /** + * Callback to invoke once the lock is acquired (or denied). This is set to + * {@code null} once the callback has been invoked. + */ + private final AtomicReference<Callback> callback; + + /** + * Constructs a future that has already been completed. + * + * @param resourceId + * @param owner owner for which the lock was requested + * @param locked {@code true} if the lock has been acquired, {@code false} if the lock + * request has been denied + * @throws IllegalArgumentException if any of the arguments are {@code null} + */ + public LockRequestFuture(String resourceId, String owner, boolean locked) { + if (resourceId == null) { + throw makeNullArgException(MSG_NULL_RESOURCE_ID); + } + + if (owner == null) { + throw makeNullArgException(MSG_NULL_OWNER); + } + + this.resourceId = resourceId; + this.owner = owner; + this.callback = new AtomicReference<>(null); + this.state = new AtomicReference<>(locked ? State.ACQUIRED : State.DENIED); + + // indicate that it's already done + this.waiter.countDown(); + } + + /** + * Constructs a future that has not yet been completed. + * + * @param resourceId + * @param owner owner for which the lock was requested + * @param callback item to be wrapped + * @throws IllegalArgumentException if the resourceId or owner is {@code null} + */ + public LockRequestFuture(String resourceId, String owner, Callback callback) { + if (resourceId == null) { + throw makeNullArgException(MSG_NULL_RESOURCE_ID); + } + + if (owner == null) { + throw makeNullArgException(MSG_NULL_OWNER); + } + + this.resourceId = resourceId; + this.owner = owner; + this.callback = new AtomicReference<>(callback); + this.state = new AtomicReference<>(State.WAITING); + } + + public String getResourceId() { + return resourceId; + } + + public String getOwner() { + return owner; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean cancelled = state.compareAndSet(State.WAITING, State.CANCELLED); + + if (cancelled) { + logger.info("resource {} owner {} cancelled lock request", resourceId, owner); + waiter.countDown(); + } + + return cancelled; + } + + /** + * Indicates that the lock has been acquired or denied. + * + * @param locked {@code true} if the lock has been acquired, {@code false} if the lock + * request has been denied + * + * @return {@code true} if it was not already completed, {@code false} otherwise + */ + protected boolean setLocked(boolean locked) { + State newState = (locked ? State.ACQUIRED : State.DENIED); + if (state.compareAndSet(State.WAITING, newState)) { + waiter.countDown(); + return true; + + } else { + return false; + } + } + + @Override + public boolean isCancelled() { + return (state.get() == State.CANCELLED); + } + + @Override + public boolean isDone() { + return (state.get() != State.WAITING); + } + + /** + * Gets the current status of the lock. + * + * @return {@code true} if the lock has been acquired, {@code false} otherwise + */ + public boolean isLocked() { + return (state.get() == State.ACQUIRED); + } + + /** + * @return {@code true} if the lock was acquired, {@code false} if it was denied + */ + @Override + public Boolean get() throws CancellationException, InterruptedException { + waiter.await(); + + switch (state.get()) { + case CANCELLED: + throw new CancellationException("lock request was cancelled"); + case ACQUIRED: + return true; + default: + // should only be DENIED at this point + return false; + } + } + + /** + * @return {@code true} if the lock was acquired, {@code false} if it was denied + */ + @Override + public Boolean get(long timeout, TimeUnit unit) + throws CancellationException, InterruptedException, TimeoutException { + + if (!waiter.await(timeout, unit)) { + throw new TimeoutException("lock request did not complete in time"); + } + + return get(); + } + + /** + * Invokes the callback, indicating whether or not the lock was acquired. + * + * @throws IllegalStateException if the request was previously cancelled, has not yet + * completed, or if the callback has already been invoked + */ + protected void invokeCallback() { + boolean locked; + + switch (state.get()) { + case ACQUIRED: + locked = true; + break; + case DENIED: + locked = false; + break; + case CANCELLED: + throw new IllegalStateException("cancelled lock request callback"); + default: + // only other choice is WAITING + throw new IllegalStateException("incomplete lock request callback"); + } + + Callback cb = callback.get(); + if (cb == null || !callback.compareAndSet(cb, null)) { + throw new IllegalStateException("already invoked lock request callback"); + } + + + // notify the callback + try { + cb.set(locked); + + } catch (RuntimeException e) { + logger.info("lock request callback for resource {} owner {} threw an exception", resourceId, owner, e); + } + } + + /** + * Makes an exception for when an argument is {@code null}. + * + * @param msg exception message + * @return a new Exception + */ + public static IllegalArgumentException makeNullArgException(String msg) { + return new IllegalArgumentException(msg); + } +} diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureAPI.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureAPI.java new file mode 100644 index 00000000..718ed5e9 --- /dev/null +++ b/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureAPI.java @@ -0,0 +1,190 @@ +/* + * ============LICENSE_START======================================================= + * api-resource-locks + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.core.lock; + +import java.util.concurrent.Future; +import org.onap.policy.drools.utils.OrderedService; +import org.onap.policy.drools.utils.OrderedServiceImpl; + +/** + * Resource locks. Each lock has an "owner", which is intended to be unique across a + * single instance of a running PolicyEngine. + * <p> + * This interface provides a way to invoke optional features at various points in the + * code. At appropriate points in the application, the code iterates through this list, + * invoking these optional methods. + * <p> + * Implementers may choose to implement a level of locking appropriate to the application. + * For instance, they may choose to implement an engine-wide locking scheme, or they may + * choose to implement a global locking scheme (e.g., through a shared DB). + */ +public interface PolicyResourceLockFeatureAPI extends OrderedService { + + /** + * 'FeatureAPI.impl.getList()' returns an ordered list of objects implementing the + * 'FeatureAPI' interface. + */ + public static OrderedServiceImpl<PolicyResourceLockFeatureAPI> impl = + new OrderedServiceImpl<>(PolicyResourceLockFeatureAPI.class); + + /** + * Callback that an implementer invokes when a lock is acquired (or denied), + * asynchronously. The implementer invokes the method to indicate that the lock was + * acquired (or denied). + */ + @FunctionalInterface + public static interface Callback { + + /** + * + * @param locked {@code true} if the lock was acquired, {@code false} if the lock + * was denied + */ + public void set(boolean locked); + } + + /** + * This method is called before a lock is acquired on a resource. If a callback is + * provided, and the implementer is unable to acquire the lock immediately, then the + * implementer will invoke the callback once the lock is acquired. If the implementer + * handled the request, then it will return a future, which may be in one of three + * states: + * <dl> + * <dt>isDone()=true and get()=true</dt> + * <dd>the lock has been acquired; the callback may or may not have been invoked</dd> + * <dt>isDone()=true and get()=false</dt> + * <dd>the lock request has been denied; the callback may or may not have been + * invoked</dd> + * <dt>isDone()=false</dt> + * <dd>the lock was not immediately available and a callback was provided. The + * callback will be invoked once the lock is acquired (or denied). In this case, the + * future may be used to cancel the request</dd> + * </dl> + * + * @param resourceId + * @param owner + * @param callback function to invoke, if the requester wishes to wait for the lock to + * come available, {@code null} to provide immediate replies + * @return a future for the lock, if the implementer handled the request, {@code null} + * if additional locking logic should be performed + * @throws IllegalStateException if the owner already holds the lock or is already in + * the queue to get the lock + */ + public default Future<Boolean> beforeLock(String resourceId, String owner, Callback callback) { + return null; + } + + /** + * This method is called after a lock for a resource has been acquired or denied. This + * may be invoked immediately, if the status can be determined immediately, or it may + * be invoked asynchronously, once the status has been determined. + * + * @param resourceId + * @param owner + * @param locked {@code true} if the lock was acquired, {@code false} if it was denied + * @return {@code true} if the implementer handled the request, {@code false} + * otherwise + */ + public default boolean afterLock(String resourceId, String owner, boolean locked) { + return false; + } + + /** + * This method is called before a lock on a resource is released. + * + * @param resourceId + * @param owner + * <dt>true</dt> + * <dd>the implementer handled the request and found the resource to be locked + * by the given owner; the resource was unlocked and no additional locking + * logic should be performed</dd> + * <dt>false</dt> + * <dd>the implementer handled the request and found the resource was not + * locked by given the owner; no additional locking logic should be + * performed</dd> + * <dt>null</dt> + * <dd>the implementer did not handle the request; additional locking logic + * <i>should be</i> performed + * </dl> + */ + public default Boolean beforeUnlock(String resourceId, String owner) { + return null; + } + + /** + * This method is called after a lock on a resource is released. + * + * @param resourceId + * @param owner + * @param unlocked {@code true} if the lock was released, {@code false} if the owner + * did not have a lock on the resource + * @return {@code true} if the implementer handled the request, {@code false} + * otherwise + */ + public default boolean afterUnlock(String resourceId, String owner, boolean unlocked) { + return false; + } + + /** + * This method is called before a check is made to determine if a resource is locked. + * + * @param resourceId + * @return + * <dl> + * <dt>true</dt> + * <dd>the implementer handled the request and found the resource to be + * locked; no additional locking logic should be performed</dd> + * <dt>false</dt> + * <dd>the implementer handled the request and found the resource was not + * locked; no additional locking logic should be performed</dd> + * <dt>null</dt> + * <dd>the implementer did not handle the request; additional locking logic + * <i>should be</i> performed + * </dl> + */ + public default Boolean beforeIsLocked(String resourceId) { + return null; + } + + /** + * This method is called before a check is made to determine if a particular owner + * holds the lock on a resource. + * + * @param resourceId + * @param owner + * @return + * <dl> + * <dt>true</dt> + * <dd>the implementer handled the request and found the resource to be locked + * by the given owner; no additional locking logic should be performed</dd> + * <dt>false</dt> + * <dd>the implementer handled the request and found the resource was not + * locked by given the owner; no additional locking logic should be + * performed</dd> + * <dt>null</dt> + * <dd>the implementer did not handle the request; additional locking logic + * <i>should be</i> performed + * </dl> + */ + public default Boolean beforeIsLockedBy(String resourceId, String owner) { + return null; + } +} @@ -89,6 +89,7 @@ <module>api-active-standby-management</module> <module>feature-active-standby-management</module> <module>feature-simulators</module> + <module>feature-distributed-locking</module> <module>packages</module> </modules> |