diff options
author | Magnusen, Drew (dm741q) <dm741q@att.com> | 2018-03-21 16:44:45 -0500 |
---|---|---|
committer | Magnusen, Drew (dm741q) <dm741q@att.com> | 2018-04-03 14:05:18 -0500 |
commit | fff9b57f7411deb798431bd625944fcfdbe053ac (patch) | |
tree | c1d7b2d23df54a61a15cd0804f7cce3b42c527f7 /policy-core | |
parent | 54bc3867539264a518c88772e82ea8070ef97c79 (diff) |
Implementation of distributed locking feature
This feature is a very basic implementation of a distributed locking
system.
Issue-ID: POLICY-699
Change-Id: I012fd37926ccbbdd87a3e4acb2788b53680115f0
Signed-off-by: Magnusen, Drew (dm741q) <dm741q@att.com>
Diffstat (limited to 'policy-core')
2 files changed, 451 insertions, 0 deletions
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockRequestFuture.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockRequestFuture.java new file mode 100644 index 00000000..46d1ff2d --- /dev/null +++ b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockRequestFuture.java @@ -0,0 +1,261 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.core.lock; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureAPI.Callback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Future associated with a lock request. + */ +public class LockRequestFuture implements Future<Boolean> { + + // messages used in exceptions + public static final String MSG_NULL_RESOURCE_ID = "null resourceId"; + public static final String MSG_NULL_OWNER = "null owner"; + + private static Logger logger = LoggerFactory.getLogger(LockRequestFuture.class); + + /** + * The resource on which the lock was requested. + */ + private final String resourceId; + + /** + * The owner for which the lock was requested. + */ + private final String owner; + + /** + * Possible states for this future. + */ + private enum State { + WAITING, CANCELLED, ACQUIRED, DENIED + }; + + private AtomicReference<State> state; + + /** + * Used to wait for the lock request to complete. + */ + private CountDownLatch waiter = new CountDownLatch(1); + + /** + * Callback to invoke once the lock is acquired (or denied). This is set to + * {@code null} once the callback has been invoked. + */ + private final AtomicReference<Callback> callback; + + /** + * Constructs a future that has already been completed. + * + * @param resourceId + * @param owner owner for which the lock was requested + * @param locked {@code true} if the lock has been acquired, {@code false} if the lock + * request has been denied + * @throws IllegalArgumentException if any of the arguments are {@code null} + */ + public LockRequestFuture(String resourceId, String owner, boolean locked) { + if (resourceId == null) { + throw makeNullArgException(MSG_NULL_RESOURCE_ID); + } + + if (owner == null) { + throw makeNullArgException(MSG_NULL_OWNER); + } + + this.resourceId = resourceId; + this.owner = owner; + this.callback = new AtomicReference<>(null); + this.state = new AtomicReference<>(locked ? State.ACQUIRED : State.DENIED); + + // indicate that it's already done + this.waiter.countDown(); + } + + /** + * Constructs a future that has not yet been completed. + * + * @param resourceId + * @param owner owner for which the lock was requested + * @param callback item to be wrapped + * @throws IllegalArgumentException if the resourceId or owner is {@code null} + */ + public LockRequestFuture(String resourceId, String owner, Callback callback) { + if (resourceId == null) { + throw makeNullArgException(MSG_NULL_RESOURCE_ID); + } + + if (owner == null) { + throw makeNullArgException(MSG_NULL_OWNER); + } + + this.resourceId = resourceId; + this.owner = owner; + this.callback = new AtomicReference<>(callback); + this.state = new AtomicReference<>(State.WAITING); + } + + public String getResourceId() { + return resourceId; + } + + public String getOwner() { + return owner; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean cancelled = state.compareAndSet(State.WAITING, State.CANCELLED); + + if (cancelled) { + logger.info("resource {} owner {} cancelled lock request", resourceId, owner); + waiter.countDown(); + } + + return cancelled; + } + + /** + * Indicates that the lock has been acquired or denied. + * + * @param locked {@code true} if the lock has been acquired, {@code false} if the lock + * request has been denied + * + * @return {@code true} if it was not already completed, {@code false} otherwise + */ + protected boolean setLocked(boolean locked) { + State newState = (locked ? State.ACQUIRED : State.DENIED); + if (state.compareAndSet(State.WAITING, newState)) { + waiter.countDown(); + return true; + + } else { + return false; + } + } + + @Override + public boolean isCancelled() { + return (state.get() == State.CANCELLED); + } + + @Override + public boolean isDone() { + return (state.get() != State.WAITING); + } + + /** + * Gets the current status of the lock. + * + * @return {@code true} if the lock has been acquired, {@code false} otherwise + */ + public boolean isLocked() { + return (state.get() == State.ACQUIRED); + } + + /** + * @return {@code true} if the lock was acquired, {@code false} if it was denied + */ + @Override + public Boolean get() throws CancellationException, InterruptedException { + waiter.await(); + + switch (state.get()) { + case CANCELLED: + throw new CancellationException("lock request was cancelled"); + case ACQUIRED: + return true; + default: + // should only be DENIED at this point + return false; + } + } + + /** + * @return {@code true} if the lock was acquired, {@code false} if it was denied + */ + @Override + public Boolean get(long timeout, TimeUnit unit) + throws CancellationException, InterruptedException, TimeoutException { + + if (!waiter.await(timeout, unit)) { + throw new TimeoutException("lock request did not complete in time"); + } + + return get(); + } + + /** + * Invokes the callback, indicating whether or not the lock was acquired. + * + * @throws IllegalStateException if the request was previously cancelled, has not yet + * completed, or if the callback has already been invoked + */ + protected void invokeCallback() { + boolean locked; + + switch (state.get()) { + case ACQUIRED: + locked = true; + break; + case DENIED: + locked = false; + break; + case CANCELLED: + throw new IllegalStateException("cancelled lock request callback"); + default: + // only other choice is WAITING + throw new IllegalStateException("incomplete lock request callback"); + } + + Callback cb = callback.get(); + if (cb == null || !callback.compareAndSet(cb, null)) { + throw new IllegalStateException("already invoked lock request callback"); + } + + + // notify the callback + try { + cb.set(locked); + + } catch (RuntimeException e) { + logger.info("lock request callback for resource {} owner {} threw an exception", resourceId, owner, e); + } + } + + /** + * Makes an exception for when an argument is {@code null}. + * + * @param msg exception message + * @return a new Exception + */ + public static IllegalArgumentException makeNullArgException(String msg) { + return new IllegalArgumentException(msg); + } +} diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureAPI.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureAPI.java new file mode 100644 index 00000000..718ed5e9 --- /dev/null +++ b/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureAPI.java @@ -0,0 +1,190 @@ +/* + * ============LICENSE_START======================================================= + * api-resource-locks + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.core.lock; + +import java.util.concurrent.Future; +import org.onap.policy.drools.utils.OrderedService; +import org.onap.policy.drools.utils.OrderedServiceImpl; + +/** + * Resource locks. Each lock has an "owner", which is intended to be unique across a + * single instance of a running PolicyEngine. + * <p> + * This interface provides a way to invoke optional features at various points in the + * code. At appropriate points in the application, the code iterates through this list, + * invoking these optional methods. + * <p> + * Implementers may choose to implement a level of locking appropriate to the application. + * For instance, they may choose to implement an engine-wide locking scheme, or they may + * choose to implement a global locking scheme (e.g., through a shared DB). + */ +public interface PolicyResourceLockFeatureAPI extends OrderedService { + + /** + * 'FeatureAPI.impl.getList()' returns an ordered list of objects implementing the + * 'FeatureAPI' interface. + */ + public static OrderedServiceImpl<PolicyResourceLockFeatureAPI> impl = + new OrderedServiceImpl<>(PolicyResourceLockFeatureAPI.class); + + /** + * Callback that an implementer invokes when a lock is acquired (or denied), + * asynchronously. The implementer invokes the method to indicate that the lock was + * acquired (or denied). + */ + @FunctionalInterface + public static interface Callback { + + /** + * + * @param locked {@code true} if the lock was acquired, {@code false} if the lock + * was denied + */ + public void set(boolean locked); + } + + /** + * This method is called before a lock is acquired on a resource. If a callback is + * provided, and the implementer is unable to acquire the lock immediately, then the + * implementer will invoke the callback once the lock is acquired. If the implementer + * handled the request, then it will return a future, which may be in one of three + * states: + * <dl> + * <dt>isDone()=true and get()=true</dt> + * <dd>the lock has been acquired; the callback may or may not have been invoked</dd> + * <dt>isDone()=true and get()=false</dt> + * <dd>the lock request has been denied; the callback may or may not have been + * invoked</dd> + * <dt>isDone()=false</dt> + * <dd>the lock was not immediately available and a callback was provided. The + * callback will be invoked once the lock is acquired (or denied). In this case, the + * future may be used to cancel the request</dd> + * </dl> + * + * @param resourceId + * @param owner + * @param callback function to invoke, if the requester wishes to wait for the lock to + * come available, {@code null} to provide immediate replies + * @return a future for the lock, if the implementer handled the request, {@code null} + * if additional locking logic should be performed + * @throws IllegalStateException if the owner already holds the lock or is already in + * the queue to get the lock + */ + public default Future<Boolean> beforeLock(String resourceId, String owner, Callback callback) { + return null; + } + + /** + * This method is called after a lock for a resource has been acquired or denied. This + * may be invoked immediately, if the status can be determined immediately, or it may + * be invoked asynchronously, once the status has been determined. + * + * @param resourceId + * @param owner + * @param locked {@code true} if the lock was acquired, {@code false} if it was denied + * @return {@code true} if the implementer handled the request, {@code false} + * otherwise + */ + public default boolean afterLock(String resourceId, String owner, boolean locked) { + return false; + } + + /** + * This method is called before a lock on a resource is released. + * + * @param resourceId + * @param owner + * <dt>true</dt> + * <dd>the implementer handled the request and found the resource to be locked + * by the given owner; the resource was unlocked and no additional locking + * logic should be performed</dd> + * <dt>false</dt> + * <dd>the implementer handled the request and found the resource was not + * locked by given the owner; no additional locking logic should be + * performed</dd> + * <dt>null</dt> + * <dd>the implementer did not handle the request; additional locking logic + * <i>should be</i> performed + * </dl> + */ + public default Boolean beforeUnlock(String resourceId, String owner) { + return null; + } + + /** + * This method is called after a lock on a resource is released. + * + * @param resourceId + * @param owner + * @param unlocked {@code true} if the lock was released, {@code false} if the owner + * did not have a lock on the resource + * @return {@code true} if the implementer handled the request, {@code false} + * otherwise + */ + public default boolean afterUnlock(String resourceId, String owner, boolean unlocked) { + return false; + } + + /** + * This method is called before a check is made to determine if a resource is locked. + * + * @param resourceId + * @return + * <dl> + * <dt>true</dt> + * <dd>the implementer handled the request and found the resource to be + * locked; no additional locking logic should be performed</dd> + * <dt>false</dt> + * <dd>the implementer handled the request and found the resource was not + * locked; no additional locking logic should be performed</dd> + * <dt>null</dt> + * <dd>the implementer did not handle the request; additional locking logic + * <i>should be</i> performed + * </dl> + */ + public default Boolean beforeIsLocked(String resourceId) { + return null; + } + + /** + * This method is called before a check is made to determine if a particular owner + * holds the lock on a resource. + * + * @param resourceId + * @param owner + * @return + * <dl> + * <dt>true</dt> + * <dd>the implementer handled the request and found the resource to be locked + * by the given owner; no additional locking logic should be performed</dd> + * <dt>false</dt> + * <dd>the implementer handled the request and found the resource was not + * locked by given the owner; no additional locking logic should be + * performed</dd> + * <dt>null</dt> + * <dd>the implementer did not handle the request; additional locking logic + * <i>should be</i> performed + * </dl> + */ + public default Boolean beforeIsLockedBy(String resourceId, String owner) { + return null; + } +} |