From 82a6252d3d3008c1ee568b1eb85de0701600918d Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Wed, 9 Feb 2022 13:36:57 +0100 Subject: Updated Lock to avoid starvation The lock class is made more greedy so all locks are granted in the requested order. Previously, an exclusive lock could be discriminated. Issue-ID: CCSDK-3560 Signed-off-by: PatrikBuhr Change-Id: If2dd171409c58eacbccce9569b2f6694e09992a2 --- .../controllers/v2/PolicyController.java | 28 ++++--- .../controllers/v2/ServiceController.java | 2 +- .../a1policymanagementservice/repository/Lock.java | 98 +++++++++++++++++----- .../tasks/RicSupervision.java | 2 +- .../tasks/RicSynchronizationTask.java | 2 +- .../tasks/ServiceSupervision.java | 5 +- .../controllers/v2/ApplicationTest.java | 11 +-- .../repository/LockTest.java | 44 ++++++---- .../tasks/RicSupervisionTest.java | 5 +- 9 files changed, 137 insertions(+), 60 deletions(-) (limited to 'a1-policy-management/src') diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java index 5f2f6193..125b75a3 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java @@ -43,6 +43,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory; import org.onap.ccsdk.oran.a1policymanagementservice.controllers.VoidResponse; import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.EntityNotFoundException; import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException; +import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy; @@ -195,15 +196,17 @@ public class PolicyController { @PathVariable(Consts.POLICY_ID_PARAM) String policyId) throws EntityNotFoundException { Policy policy = policies.getPolicy(policyId); keepServiceAlive(policy.getOwnerServiceId()); - Ric ric = policy.getRic(); - return ric.getLock().lock(LockType.SHARED) // - .flatMap(notUsed -> assertRicStateIdle(ric)) // + return policy.getRic().getLock().lock(LockType.SHARED, "deletePolicy") // + .flatMap(grant -> deletePolicy(grant, policy)); + } + + Mono> deletePolicy(Lock.Grant grant, Policy policy) { + return assertRicStateIdle(policy.getRic()) // .flatMap(notUsed -> a1ClientFactory.createA1Client(policy.getRic())) // .doOnNext(notUsed -> policies.remove(policy)) // + .doFinally(x -> grant.unlockBlocking()) // .flatMap(client -> client.deletePolicy(policy)) // - .doOnNext(notUsed -> ric.getLock().unlockBlocking()) // - .doOnError(notUsed -> ric.getLock().unlockBlocking()) // .map(notUsed -> new ResponseEntity<>(HttpStatus.NO_CONTENT)) // .onErrorResume(this::handleException); } @@ -247,19 +250,24 @@ public class PolicyController { .statusNotificationUri(policyInfo.statusNotificationUri == null ? "" : policyInfo.statusNotificationUri) // .build(); + return ric.getLock().lock(LockType.SHARED, "putPolicy") // + .flatMap(grant -> putPolicy(grant, policy)); + } + + private Mono> putPolicy(Lock.Grant grant, Policy policy) { final boolean isCreate = this.policies.get(policy.getId()) == null; + final Ric ric = policy.getRic(); - return ric.getLock().lock(LockType.SHARED) // - .flatMap(notUsed -> assertRicStateIdle(ric)) // - .flatMap(notUsed -> checkSupportedType(ric, type)) // + return assertRicStateIdle(ric) // + .flatMap(notUsed -> checkSupportedType(ric, policy.getType())) // .flatMap(notUsed -> validateModifiedPolicy(policy)) // .flatMap(notUsed -> a1ClientFactory.createA1Client(ric)) // .flatMap(client -> client.putPolicy(policy)) // .doOnNext(notUsed -> policies.put(policy)) // - .doOnNext(notUsed -> ric.getLock().unlockBlocking()) // - .doOnError(trowable -> ric.getLock().unlockBlocking()) // + .doFinally(x -> grant.unlockBlocking()) // .flatMap(notUsed -> Mono.just(new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK))) // .onErrorResume(this::handleException); + } private Mono> handleException(Throwable throwable) { diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ServiceController.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ServiceController.java index a1106192..375f92f9 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ServiceController.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ServiceController.java @@ -174,7 +174,7 @@ public class ServiceController { } @Operation(summary = "Heartbeat indicates that the service is running", - description = "A registerred service must call this in regular intervals to indicate that it is in operation. Absence of this call will lead to that teh service will be deregisterred and all its policies are removed.") + description = "A registerred service must call this in regular intervals to indicate that it is in operation. Absence of this call will lead to that the service will be deregisterred and all its policies are removed.") @ApiResponses(value = { // @ApiResponse(responseCode = "200", description = "Service supervision timer refreshed, OK"), // @ApiResponse(responseCode = "404", description = "The service is not found, needs re-registration", diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Lock.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Lock.java index 6892a503..18507ac2 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Lock.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Lock.java @@ -21,9 +21,11 @@ package org.onap.ccsdk.oran.a1policymanagementservice.repository; import java.util.ArrayList; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Queue; + +import lombok.Getter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,29 +41,81 @@ import reactor.core.publisher.MonoSink; public class Lock { private static final Logger logger = LoggerFactory.getLogger(Lock.class); - private boolean isExclusive = false; + boolean isExclusive = false; private int lockCounter = 0; - private final List lockRequestQueue = new LinkedList<>(); + final Queue lockRequestQueue = new LinkedList<>(); private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor(); public enum LockType { EXCLUSIVE, SHARED } - /** The caller thread will be blocked util the lock is granted. */ - public synchronized void lockBlocking(LockType locktype) { - while (!tryLock(locktype)) { - this.waitForUnlock(); + /** + * A grant is achieved when the lock is granted. + * It can be used for unlocking. + */ + public static class Grant { + private final Lock lock; + private boolean unlocked = false; + @Getter + private final String label; + + Grant(Lock lock, String label) { + this.lock = lock; + this.label = label; + } + + /** + * reactive unlocking. Submits the lock. + * + * @return the lock + */ + public Mono unlock() { + check(); + return this.lock.unlock(); + } + + /** + * Synchronuous unlocking + */ + public void unlockBlocking() { + check(); + this.lock.unlockBlocking(); + } + + private void check() { + if (unlocked) { + logger.error("Lock already unlocked"); + } + unlocked = true; } } - /** Reactive version. The Lock will be emitted when the lock is granted */ - public synchronized Mono lock(LockType lockType) { + /** + * Reactive lock. The Lock will be emitted when the lock is granted + * + * @param lockType type of lock (exclusive/shared) + * @param label a label that will be attached to the request. Will be passed + * back in the Grant + * @return a Grant that cane be used only to unlock. + */ + public synchronized Mono lock(LockType lockType, String label) { if (tryLock(lockType)) { - return Mono.just(this); + return Mono.just(new Grant(this, label)); } else { - return Mono.create(monoSink -> addToQueue(monoSink, lockType)); + return Mono.create(monoSink -> addToQueue(monoSink, lockType, label)); + } + } + + /** + * A synchronuous variant of locking. The caller thread will be blocked util the + * lock is granted. + */ + public synchronized Grant lockBlocking(LockType locktype, String label) { + while (!tryLock(locktype)) { + this.waitForUnlock(); } + return new Grant(this, label); } public Mono unlock() { @@ -97,19 +151,20 @@ public class Lock { private void processQueuedEntries() { List granted = new ArrayList<>(); - for (Iterator i = lockRequestQueue.iterator(); i.hasNext();) { - LockRequest request = i.next(); + while (!lockRequestQueue.isEmpty()) { + LockRequest request = lockRequestQueue.element(); if (tryLock(request.lockType)) { - i.remove(); + lockRequestQueue.remove(); granted.add(request); + } else { + break; // Avoid starvation } } callbackProcessor.addAll(granted); } - private synchronized void addToQueue(MonoSink callback, LockType lockType) { - lockRequestQueue.add(new LockRequest(callback, lockType, this)); - processQueuedEntries(); + private synchronized void addToQueue(MonoSink callback, LockType lockType, String label) { + lockRequestQueue.add(new LockRequest(callback, lockType, this, label)); } @SuppressWarnings("java:S2274") // Always invoke wait() and await() methods inside a loop @@ -138,14 +193,16 @@ public class Lock { * Represents a queued lock request */ private static class LockRequest { - final MonoSink callback; + final MonoSink callback; final LockType lockType; final Lock lock; + final String label; - LockRequest(MonoSink callback, LockType lockType, Lock lock) { + LockRequest(MonoSink callback, LockType lockType, Lock lock, String label) { this.callback = callback; this.lockType = lockType; this.lock = lock; + this.label = label; } } @@ -171,7 +228,8 @@ public class Lock { try { while (true) { for (LockRequest request : consume()) { - request.callback.success(request.lock); + Grant g = new Grant(request.lock, request.label); + request.callback.success(g); } waitForNewEntries(); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java index b4c6595d..2c037838 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java @@ -113,7 +113,7 @@ public class RicSupervision { private Mono checkOneRic(RicData ricData) { return checkRicState(ricData) // - .flatMap(x -> ricData.ric.getLock().lock(LockType.EXCLUSIVE)) // + .flatMap(x -> ricData.ric.getLock().lock(LockType.EXCLUSIVE, "checkOneRic")) // .flatMap(notUsed -> setRicState(ricData)) // .flatMap(x -> checkRicPolicies(ricData)) // .flatMap(x -> checkRicPolicyTypes(ricData)) // diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java index 0552df6a..6305abf0 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java @@ -85,7 +85,7 @@ public class RicSynchronizationTask { return; } - ric.getLock().lock(LockType.EXCLUSIVE) // + ric.getLock().lock(LockType.EXCLUSIVE, "RicSynchronizationTask") // .flatMap(notUsed -> synchronizeRic(ric)) // .subscribe(new BaseSubscriber() { diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/ServiceSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/ServiceSupervision.java index d605b69e..e40634e2 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/ServiceSupervision.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/ServiceSupervision.java @@ -92,13 +92,12 @@ public class ServiceSupervision { @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally private Flux deletePolicy(Policy policy) { Lock lock = policy.getRic().getLock(); - return lock.lock(LockType.SHARED) // + return lock.lock(LockType.SHARED, "ServiceSupervision") // .doOnNext(notUsed -> policies.remove(policy)) // .flatMap(notUsed -> deletePolicyInRic(policy)) .doOnNext(notUsed -> logger.debug("Policy deleted due to service inactivity: {}, service: {}", policy.getId(), policy.getOwnerServiceId())) // - .doOnNext(notUsed -> lock.unlockBlocking()) // - .doOnError(throwable -> lock.unlockBlocking()) // + .doFinally(notUsed -> lock.unlockBlocking()) // .doOnError(throwable -> logger.debug("Failed to delete inactive policy: {}, reason: {}", policy.getId(), throwable.getMessage())) // .flatMapMany(notUsed -> Flux.just(policy)) // diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java index c203d0fd..75215613 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java @@ -57,6 +57,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.WebClientConfig; import org.onap.ccsdk.oran.a1policymanagementservice.controllers.ServiceCallbackInfo; import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException; +import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy; @@ -178,8 +179,8 @@ class ApplicationTest { @AfterEach void verifyNoRicLocks() { for (Ric ric : this.rics.getRics()) { - ric.getLock().lockBlocking(LockType.EXCLUSIVE); - ric.getLock().unlockBlocking(); + Lock.Grant grant = ric.getLock().lockBlocking(LockType.EXCLUSIVE, ""); + grant.unlockBlocking(); assertThat(ric.getLock().getLockCounter()).isZero(); assertThat(ric.getState()).isEqualTo(Ric.RicState.AVAILABLE); } @@ -276,7 +277,7 @@ class ApplicationTest { // Check that a service callback for the AVAILABLE RIC is invoked final RappSimulatorController.TestResults receivedCallbacks = rAppSimulator.getTestResults(); - await().untilAsserted(() -> assertThat(receivedCallbacks.getReceivedInfo().size()).isEqualTo(1)); + await().untilAsserted(() -> assertThat(receivedCallbacks.getReceivedInfo()).hasSize(1)); ServiceCallbackInfo callbackInfo = receivedCallbacks.getReceivedInfo().get(0); assertThat(callbackInfo.ricId).isEqualTo(RIC); assertThat(callbackInfo.eventType).isEqualTo(ServiceCallbackInfo.EventType.AVAILABLE); @@ -305,7 +306,7 @@ class ApplicationTest { supervision.checkAllRics(); waitForRicState(RIC, RicState.AVAILABLE); - await().untilAsserted(() -> assertThat(receivedCallbacks.getReceivedInfo().size()).isEqualTo(1)); + await().untilAsserted(() -> assertThat(receivedCallbacks.getReceivedInfo()).hasSize(1)); } @Test @@ -842,7 +843,7 @@ class ApplicationTest { waitForRicState("ric1", RicState.AVAILABLE); final RappSimulatorController.TestResults receivedCallbacks = rAppSimulator.getTestResults(); - await().untilAsserted(() -> assertThat(receivedCallbacks.getReceivedInfo().size()).isEqualTo(1)); + await().untilAsserted(() -> assertThat(receivedCallbacks.getReceivedInfo()).hasSize(1)); ServiceCallbackInfo callbackInfo = receivedCallbacks.getReceivedInfo().get(0); assertThat(callbackInfo.ricId).isEqualTo("ric1"); assertThat(callbackInfo.eventType).isEqualTo(ServiceCallbackInfo.EventType.AVAILABLE); diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/LockTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/LockTest.java index a63f15f1..79879446 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/LockTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/LockTest.java @@ -21,21 +21,28 @@ package org.onap.ccsdk.oran.a1policymanagementservice.repository; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import java.io.IOException; +import java.lang.invoke.MethodHandles; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @ExtendWith(MockitoExtension.class) class LockTest { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. private void sleep() { try { @@ -45,10 +52,13 @@ class LockTest { } } - private void asynchUnlock(Lock lock) { + private void asynchUnlock(Lock.Grant grant, Lock lock) { + logger.info("Lock {} cnt: {}, exclusive: {}, queue: {}", grant.getLabel(), lock.getLockCounter(), + lock.isExclusive, lock.lockRequestQueue.size()); + Thread thread = new Thread(() -> { sleep(); - lock.unlockBlocking(); + grant.unlockBlocking(); }); thread.start(); } @@ -56,14 +66,15 @@ class LockTest { @Test void testLock() throws IOException, ServiceException { Lock lock = new Lock(); - lock.lockBlocking(LockType.SHARED); - lock.unlockBlocking(); + Lock.Grant grant = lock.lockBlocking(LockType.SHARED, "test"); + grant.unlockBlocking(); + assertThat(grant.getLabel()).isEqualTo("test"); - lock.lockBlocking(LockType.EXCLUSIVE); - asynchUnlock(lock); + grant = lock.lockBlocking(LockType.EXCLUSIVE, ""); + asynchUnlock(grant, lock); - lock.lockBlocking(LockType.SHARED); - lock.unlockBlocking(); + grant = lock.lockBlocking(LockType.SHARED, ""); + grant.unlockBlocking(); assertThat(lock.getLockCounter()).isZero(); } @@ -72,18 +83,17 @@ class LockTest { void testReactiveLock() { Lock lock = new Lock(); - Mono seq = lock.lock(LockType.EXCLUSIVE) // - .flatMap(l -> lock.lock(LockType.EXCLUSIVE)) // - .flatMap(l -> lock.unlock()); + Mono l0 = lock.lock(LockType.EXCLUSIVE, "1").doOnNext(grant -> asynchUnlock(grant, lock)); + Mono l1 = lock.lock(LockType.SHARED, "2").doOnNext(grant -> asynchUnlock(grant, lock)); + Mono l2 = lock.lock(LockType.SHARED, "3").doOnNext(grant -> asynchUnlock(grant, lock)); + Mono l3 = lock.lock(LockType.EXCLUSIVE, "4").doOnNext(grant -> asynchUnlock(grant, lock)); + Mono l4 = lock.lock(LockType.SHARED, "5").doOnNext(grant -> asynchUnlock(grant, lock)); - asynchUnlock(lock); - StepVerifier.create(seq) // + StepVerifier.create(Flux.zip(l0, l1, l2, l3, l4)) // .expectSubscription() // - .expectNext(lock) // + .expectNextCount(1) // .verifyComplete(); - assertThat(lock.getLockCounter()).isZero(); - + await().untilAsserted(() -> assertThat(lock.getLockCounter()).isZero()); } - } diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java index 313d5ddd..f6c93b95 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java @@ -45,6 +45,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1Client; import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ImmutableRicConfig; +import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy; @@ -121,8 +122,8 @@ class RicSupervisionTest { @AfterEach void verifyNoRicLocks() { for (Ric ric : this.rics.getRics()) { - ric.getLock().lockBlocking(LockType.EXCLUSIVE); - ric.getLock().unlockBlocking(); + Lock.Grant grant = ric.getLock().lockBlocking(LockType.EXCLUSIVE, ""); + grant.unlockBlocking(); assertThat(ric.getLock().getLockCounter()).isZero(); } } -- cgit 1.2.3-korg