aboutsummaryrefslogtreecommitdiffstats
path: root/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Lock.java
diff options
context:
space:
mode:
Diffstat (limited to 'a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Lock.java')
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Lock.java98
1 files changed, 78 insertions, 20 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Lock.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Lock.java
index 6892a503..18507ac2 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Lock.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Lock.java
@@ -21,9 +21,11 @@
package org.onap.ccsdk.oran.a1policymanagementservice.repository;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Queue;
+
+import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,29 +41,81 @@ import reactor.core.publisher.MonoSink;
public class Lock {
private static final Logger logger = LoggerFactory.getLogger(Lock.class);
- private boolean isExclusive = false;
+ boolean isExclusive = false;
private int lockCounter = 0;
- private final List<LockRequest> lockRequestQueue = new LinkedList<>();
+ final Queue<LockRequest> lockRequestQueue = new LinkedList<>();
private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor();
public enum LockType {
EXCLUSIVE, SHARED
}
- /** The caller thread will be blocked util the lock is granted. */
- public synchronized void lockBlocking(LockType locktype) {
- while (!tryLock(locktype)) {
- this.waitForUnlock();
+ /**
+ * A grant is achieved when the lock is granted.
+ * It can be used for unlocking.
+ */
+ public static class Grant {
+ private final Lock lock;
+ private boolean unlocked = false;
+ @Getter
+ private final String label;
+
+ Grant(Lock lock, String label) {
+ this.lock = lock;
+ this.label = label;
+ }
+
+ /**
+ * reactive unlocking. Submits the lock.
+ *
+ * @return the lock
+ */
+ public Mono<Lock> unlock() {
+ check();
+ return this.lock.unlock();
+ }
+
+ /**
+ * Synchronuous unlocking
+ */
+ public void unlockBlocking() {
+ check();
+ this.lock.unlockBlocking();
+ }
+
+ private void check() {
+ if (unlocked) {
+ logger.error("Lock already unlocked");
+ }
+ unlocked = true;
}
}
- /** Reactive version. The Lock will be emitted when the lock is granted */
- public synchronized Mono<Lock> lock(LockType lockType) {
+ /**
+ * Reactive lock. The Lock will be emitted when the lock is granted
+ *
+ * @param lockType type of lock (exclusive/shared)
+ * @param label a label that will be attached to the request. Will be passed
+ * back in the Grant
+ * @return a Grant that cane be used only to unlock.
+ */
+ public synchronized Mono<Grant> lock(LockType lockType, String label) {
if (tryLock(lockType)) {
- return Mono.just(this);
+ return Mono.just(new Grant(this, label));
} else {
- return Mono.create(monoSink -> addToQueue(monoSink, lockType));
+ return Mono.create(monoSink -> addToQueue(monoSink, lockType, label));
+ }
+ }
+
+ /**
+ * A synchronuous variant of locking. The caller thread will be blocked util the
+ * lock is granted.
+ */
+ public synchronized Grant lockBlocking(LockType locktype, String label) {
+ while (!tryLock(locktype)) {
+ this.waitForUnlock();
}
+ return new Grant(this, label);
}
public Mono<Lock> unlock() {
@@ -97,19 +151,20 @@ public class Lock {
private void processQueuedEntries() {
List<LockRequest> granted = new ArrayList<>();
- for (Iterator<LockRequest> i = lockRequestQueue.iterator(); i.hasNext();) {
- LockRequest request = i.next();
+ while (!lockRequestQueue.isEmpty()) {
+ LockRequest request = lockRequestQueue.element();
if (tryLock(request.lockType)) {
- i.remove();
+ lockRequestQueue.remove();
granted.add(request);
+ } else {
+ break; // Avoid starvation
}
}
callbackProcessor.addAll(granted);
}
- private synchronized void addToQueue(MonoSink<Lock> callback, LockType lockType) {
- lockRequestQueue.add(new LockRequest(callback, lockType, this));
- processQueuedEntries();
+ private synchronized void addToQueue(MonoSink<Grant> callback, LockType lockType, String label) {
+ lockRequestQueue.add(new LockRequest(callback, lockType, this, label));
}
@SuppressWarnings("java:S2274") // Always invoke wait() and await() methods inside a loop
@@ -138,14 +193,16 @@ public class Lock {
* Represents a queued lock request
*/
private static class LockRequest {
- final MonoSink<Lock> callback;
+ final MonoSink<Grant> callback;
final LockType lockType;
final Lock lock;
+ final String label;
- LockRequest(MonoSink<Lock> callback, LockType lockType, Lock lock) {
+ LockRequest(MonoSink<Grant> callback, LockType lockType, Lock lock, String label) {
this.callback = callback;
this.lockType = lockType;
this.lock = lock;
+ this.label = label;
}
}
@@ -171,7 +228,8 @@ public class Lock {
try {
while (true) {
for (LockRequest request : consume()) {
- request.callback.success(request.lock);
+ Grant g = new Grant(request.lock, request.label);
+ request.callback.success(g);
}
waitForNewEntries();
}