diff options
Diffstat (limited to 'a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Lock.java')
-rw-r--r-- | a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Lock.java | 98 |
1 files changed, 78 insertions, 20 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Lock.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Lock.java index 6892a503..18507ac2 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Lock.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Lock.java @@ -21,9 +21,11 @@ package org.onap.ccsdk.oran.a1policymanagementservice.repository; import java.util.ArrayList; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Queue; + +import lombok.Getter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,29 +41,81 @@ import reactor.core.publisher.MonoSink; public class Lock { private static final Logger logger = LoggerFactory.getLogger(Lock.class); - private boolean isExclusive = false; + boolean isExclusive = false; private int lockCounter = 0; - private final List<LockRequest> lockRequestQueue = new LinkedList<>(); + final Queue<LockRequest> lockRequestQueue = new LinkedList<>(); private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor(); public enum LockType { EXCLUSIVE, SHARED } - /** The caller thread will be blocked util the lock is granted. */ - public synchronized void lockBlocking(LockType locktype) { - while (!tryLock(locktype)) { - this.waitForUnlock(); + /** + * A grant is achieved when the lock is granted. + * It can be used for unlocking. + */ + public static class Grant { + private final Lock lock; + private boolean unlocked = false; + @Getter + private final String label; + + Grant(Lock lock, String label) { + this.lock = lock; + this.label = label; + } + + /** + * reactive unlocking. Submits the lock. + * + * @return the lock + */ + public Mono<Lock> unlock() { + check(); + return this.lock.unlock(); + } + + /** + * Synchronuous unlocking + */ + public void unlockBlocking() { + check(); + this.lock.unlockBlocking(); + } + + private void check() { + if (unlocked) { + logger.error("Lock already unlocked"); + } + unlocked = true; } } - /** Reactive version. The Lock will be emitted when the lock is granted */ - public synchronized Mono<Lock> lock(LockType lockType) { + /** + * Reactive lock. The Lock will be emitted when the lock is granted + * + * @param lockType type of lock (exclusive/shared) + * @param label a label that will be attached to the request. Will be passed + * back in the Grant + * @return a Grant that cane be used only to unlock. + */ + public synchronized Mono<Grant> lock(LockType lockType, String label) { if (tryLock(lockType)) { - return Mono.just(this); + return Mono.just(new Grant(this, label)); } else { - return Mono.create(monoSink -> addToQueue(monoSink, lockType)); + return Mono.create(monoSink -> addToQueue(monoSink, lockType, label)); + } + } + + /** + * A synchronuous variant of locking. The caller thread will be blocked util the + * lock is granted. + */ + public synchronized Grant lockBlocking(LockType locktype, String label) { + while (!tryLock(locktype)) { + this.waitForUnlock(); } + return new Grant(this, label); } public Mono<Lock> unlock() { @@ -97,19 +151,20 @@ public class Lock { private void processQueuedEntries() { List<LockRequest> granted = new ArrayList<>(); - for (Iterator<LockRequest> i = lockRequestQueue.iterator(); i.hasNext();) { - LockRequest request = i.next(); + while (!lockRequestQueue.isEmpty()) { + LockRequest request = lockRequestQueue.element(); if (tryLock(request.lockType)) { - i.remove(); + lockRequestQueue.remove(); granted.add(request); + } else { + break; // Avoid starvation } } callbackProcessor.addAll(granted); } - private synchronized void addToQueue(MonoSink<Lock> callback, LockType lockType) { - lockRequestQueue.add(new LockRequest(callback, lockType, this)); - processQueuedEntries(); + private synchronized void addToQueue(MonoSink<Grant> callback, LockType lockType, String label) { + lockRequestQueue.add(new LockRequest(callback, lockType, this, label)); } @SuppressWarnings("java:S2274") // Always invoke wait() and await() methods inside a loop @@ -138,14 +193,16 @@ public class Lock { * Represents a queued lock request */ private static class LockRequest { - final MonoSink<Lock> callback; + final MonoSink<Grant> callback; final LockType lockType; final Lock lock; + final String label; - LockRequest(MonoSink<Lock> callback, LockType lockType, Lock lock) { + LockRequest(MonoSink<Grant> callback, LockType lockType, Lock lock, String label) { this.callback = callback; this.lockType = lockType; this.lock = lock; + this.label = label; } } @@ -171,7 +228,8 @@ public class Lock { try { while (true) { for (LockRequest request : consume()) { - request.callback.success(request.lock); + Grant g = new Grant(request.lock, request.label); + request.callback.success(g); } waitForNewEntries(); } |