diff options
author | mpriyank <priyank.maheshwari@est.tech> | 2024-11-27 14:43:28 +0000 |
---|---|---|
committer | mpriyank <priyank.maheshwari@est.tech> | 2024-12-19 15:38:36 +0000 |
commit | a7206655a004659bd1251c70db0acfd0f58e40d4 (patch) | |
tree | 852a2331753c722e46c03ac6b508a14b646f24e1 /cps-ncmp-service | |
parent | 40578d56733247540a09b6bbf000181b46b5c1fc (diff) |
Cps and Ncmp distributed lock for various use cases
- introduced cpsAndNcmpLock to be used for any use case needing
coordination. Since it can be used for any use case hence placing the
class accordingly.
- currently lock is being used for populating workQueue.
- Removed FencedLock as it was part of CPSubsystem which is moved to
hazelcast-enterprise in 5.5.* version onwards.
- added info level logging statement to verify just one thread at a time
in the critical section
- Note : integration test to be part of a separate patch.
Issue-ID: CPS-2479
Change-Id: I0f33c7232786c517383e5093fda91fd9a1839021
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
Diffstat (limited to 'cps-ncmp-service')
4 files changed, 73 insertions, 37 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/CpsAndNcmpLockConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/CpsAndNcmpLockConfig.java new file mode 100644 index 0000000000..61cf939a51 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/CpsAndNcmpLockConfig.java @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START======================================================== + * Copyright (C) 2024 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.cache; + +import com.hazelcast.config.MapConfig; +import com.hazelcast.map.IMap; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class CpsAndNcmpLockConfig extends HazelcastCacheConfig { + + // Lock names for different use cases ( to be used as cpsAndNcmpLock keys) + public static final String MODULE_SYNC_WORK_QUEUE_LOCK_NAME = "workQueueLock"; + + private static final MapConfig cpsAndNcmpLockMapConfig = createMapConfig("cpsAndNcmpLockConfig"); + + /** + * Distributed instance used for locking purpose for various use cases in cps-and-ncmp. + * The key of the map entry is name of the lock and should be based on the use case we are locking. + * + * @return configured map of lock object to have distributed coordination. + */ + @Bean + public IMap<String, String> cpsAndNcmpLock() { + return getOrCreateHazelcastInstance(cpsAndNcmpLockMapConfig).getMap("cpsAndNcmpLock"); + } + + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java index 3f2bb4f4ef..5b71a8af70 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java @@ -21,13 +21,14 @@ package org.onap.cps.ncmp.impl.inventory.sync; +import static org.onap.cps.ncmp.impl.cache.CpsAndNcmpLockConfig.MODULE_SYNC_WORK_QUEUE_LOCK_NAME; + import com.hazelcast.map.IMap; import java.util.Collection; import java.util.HashSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -46,7 +47,7 @@ public class ModuleSyncWatchdog { private final IMap<String, Object> moduleSyncStartedOnCmHandles; private final ModuleSyncTasks moduleSyncTasks; private final AsyncTaskExecutor asyncTaskExecutor; - private final Lock workQueueLock; + private final IMap<String, String> cpsAndNcmpLock; private final Sleeper sleeper; private static final int MODULE_SYNC_BATCH_SIZE = 100; @@ -90,14 +91,16 @@ public class ModuleSyncWatchdog { * So it can be tested without the queue being emptied immediately as the main public method does. */ public void populateWorkQueueIfNeeded() { - if (moduleSyncWorkQueue.isEmpty() && workQueueLock.tryLock()) { + if (moduleSyncWorkQueue.isEmpty() && cpsAndNcmpLock.tryLock(MODULE_SYNC_WORK_QUEUE_LOCK_NAME)) { + log.info("Lock acquired by thread : {}", Thread.currentThread().getName()); try { populateWorkQueue(); if (moduleSyncWorkQueue.isEmpty()) { setPreviouslyLockedCmHandlesToAdvised(); } } finally { - workQueueLock.unlock(); + cpsAndNcmpLock.unlock(MODULE_SYNC_WORK_QUEUE_LOCK_NAME); + log.info("Lock released by thread : {}", Thread.currentThread().getName()); } } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java index def8f37eb7..d6ac242b30 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java @@ -26,7 +26,6 @@ import com.hazelcast.config.QueueConfig; import com.hazelcast.config.SetConfig; import com.hazelcast.map.IMap; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.locks.Lock; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.impl.cache.HazelcastCacheConfig; import org.springframework.context.annotation.Bean; @@ -48,7 +47,6 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig { private static final MapConfig dataSyncSemaphoresConfig = createMapConfig("dataSyncSemaphoresConfig"); private static final SetConfig moduleSetTagsBeingProcessedConfig = createSetConfig("moduleSetTagsBeingProcessedConfig"); - private static final String LOCK_NAME_FOR_WORK_QUEUE = "workQueueLock"; /** * Module Sync Distributed Queue Instance. @@ -90,21 +88,4 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig { public ISet<String> moduleSetTagsBeingProcessed() { return getOrCreateHazelcastInstance(moduleSetTagsBeingProcessedConfig).getSet("moduleSetTagsBeingProcessed"); } - - /** - * Retrieves a distributed lock used to control access to the work queue for module synchronization. - * This lock ensures that the population and modification of the work queue are thread-safe and - * protected from concurrent access across different nodes in the distributed system. - * The lock guarantees that only one instance of the application can populate or modify the - * module sync work queue at a time, preventing race conditions and potential data inconsistencies. - * The lock is obtained using the Hazelcast CP Subsystem's {@link Lock}, which provides - * strong consistency guarantees for distributed operations. - * - * @return a {@link Lock} instance used for synchronizing access to the work queue. - */ - @Bean - public Lock workQueueLock() { - // TODO Method below does not use commonQueueConfig for creating lock (Refactor later) - return getOrCreateHazelcastInstance(commonQueueConfig).getCPSubsystem().getLock(LOCK_NAME_FOR_WORK_QUEUE); - } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy index 4cf07e4c24..a9b88c2d3b 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022-2023 Nordix Foundation + * Copyright (C) 2022-2024 Nordix Foundation * Modifications Copyright (C) 2022 Bell Canada * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -44,11 +44,11 @@ class ModuleSyncWatchdogSpec extends Specification { def spiedAsyncTaskExecutor = Spy(AsyncTaskExecutor) - def mockWorkQueueLock = Mock(Lock) + def mockCpsAndNcmpLock = Mock(IMap<String,String>) def spiedSleeper = Spy(Sleeper) - def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor, mockWorkQueueLock, spiedSleeper) + def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor, mockCpsAndNcmpLock, spiedSleeper) void setup() { spiedAsyncTaskExecutor.setupThreadPool() @@ -59,14 +59,16 @@ class ModuleSyncWatchdogSpec extends Specification { mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(numberOfAdvisedCmHandles) and: 'module sync utilities returns no failed (locked) cm handles' mockModuleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> [] - and: 'the work queue is not locked' - mockWorkQueueLock.tryLock() >> true + and: 'the work queue can be locked' + mockCpsAndNcmpLock.tryLock('workQueueLock') >> true and: 'the executor has enough available threads' spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 3 when: ' module sync is started' objectUnderTest.moduleSyncAdvisedCmHandles() then: 'it performs #expectedNumberOfTaskExecutions tasks' expectedNumberOfTaskExecutions * spiedAsyncTaskExecutor.executeTask(*_) + and: 'the executing thread is unlocked' + 1 * mockCpsAndNcmpLock.unlock('workQueueLock') where: 'the following parameter are used' scenario | numberOfAdvisedCmHandles || expectedNumberOfTaskExecutions 'none at all' | 0 || 0 @@ -80,8 +82,8 @@ class ModuleSyncWatchdogSpec extends Specification { def 'Module sync cm handles starts with no available threads.'() { given: 'module sync utilities returns a advise cm handles' mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1) - and: 'the work queue is not locked' - mockWorkQueueLock.tryLock() >> true + and: 'the work queue can be locked' + mockCpsAndNcmpLock.tryLock('workQueueLock') >> true and: 'the executor first has no threads but has one thread on the second attempt' spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >>> [ 0, 1 ] when: ' module sync is started' @@ -93,8 +95,8 @@ class ModuleSyncWatchdogSpec extends Specification { def 'Module sync advised cm handle already handled by other thread.'() { given: 'module sync utilities returns an advised cm handle' mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1) - and: 'the work queue is not locked' - mockWorkQueueLock.tryLock() >> true + and: 'the work queue can be locked' + mockCpsAndNcmpLock.tryLock('workQueueLock') >> true and: 'the executor has a thread available' spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 1 and: 'the semaphore cache indicates the cm handle is already being processed' @@ -131,16 +133,18 @@ class ModuleSyncWatchdogSpec extends Specification { def 'Module Sync Locking.'() { given: 'module sync utilities returns an advised cm handle' mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1) - and: 'can lock is : #canLock' - mockWorkQueueLock.tryLock() >> canLock + and: 'can be locked is : #canLock' + mockCpsAndNcmpLock.tryLock('workQueueLock') >> canLock when: 'attempt to populate the work queue' objectUnderTest.populateWorkQueueIfNeeded() then: 'the queue remains empty is #expectQueueRemainsEmpty' assert moduleSyncWorkQueue.isEmpty() == expectQueueRemainsEmpty + and: 'unlock is called only when thread is able to enter the critical section' + expectedInvocationToUnlock * mockCpsAndNcmpLock.unlock('workQueueLock') where: 'the following lock states are applied' - canLock | expectQueueRemainsEmpty - false | true - true | false + canLock || expectQueueRemainsEmpty || expectedInvocationToUnlock + false || true || 0 + true || false || 1 } def 'Sleeper gets interrupted.'() { |