diff options
author | Toine Siebelink <toine.siebelink@est.tech> | 2024-12-19 16:09:23 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2024-12-19 16:09:23 +0000 |
commit | c7c642eac70ae3f226ceeb250f8d4b8e170e78ca (patch) | |
tree | 6c8febb7de6a6d27a1f4823c9bb1af98d71b92f2 | |
parent | 2cb739cbaf8e9bf76f867a760e004bf53ea89024 (diff) | |
parent | a7206655a004659bd1251c70db0acfd0f58e40d4 (diff) |
Merge "Cps and Ncmp distributed lock for various use cases"
6 files changed, 86 insertions, 37 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/CpsAndNcmpLockConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/CpsAndNcmpLockConfig.java new file mode 100644 index 0000000000..61cf939a51 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/CpsAndNcmpLockConfig.java @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START======================================================== + * Copyright (C) 2024 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.cache; + +import com.hazelcast.config.MapConfig; +import com.hazelcast.map.IMap; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class CpsAndNcmpLockConfig extends HazelcastCacheConfig { + + // Lock names for different use cases ( to be used as cpsAndNcmpLock keys) + public static final String MODULE_SYNC_WORK_QUEUE_LOCK_NAME = "workQueueLock"; + + private static final MapConfig cpsAndNcmpLockMapConfig = createMapConfig("cpsAndNcmpLockConfig"); + + /** + * Distributed instance used for locking purpose for various use cases in cps-and-ncmp. + * The key of the map entry is name of the lock and should be based on the use case we are locking. + * + * @return configured map of lock object to have distributed coordination. + */ + @Bean + public IMap<String, String> cpsAndNcmpLock() { + return getOrCreateHazelcastInstance(cpsAndNcmpLockMapConfig).getMap("cpsAndNcmpLock"); + } + + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java index 3f2bb4f4ef..5b71a8af70 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java @@ -21,13 +21,14 @@ package org.onap.cps.ncmp.impl.inventory.sync; +import static org.onap.cps.ncmp.impl.cache.CpsAndNcmpLockConfig.MODULE_SYNC_WORK_QUEUE_LOCK_NAME; + import com.hazelcast.map.IMap; import java.util.Collection; import java.util.HashSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -46,7 +47,7 @@ public class ModuleSyncWatchdog { private final IMap<String, Object> moduleSyncStartedOnCmHandles; private final ModuleSyncTasks moduleSyncTasks; private final AsyncTaskExecutor asyncTaskExecutor; - private final Lock workQueueLock; + private final IMap<String, String> cpsAndNcmpLock; private final Sleeper sleeper; private static final int MODULE_SYNC_BATCH_SIZE = 100; @@ -90,14 +91,16 @@ public class ModuleSyncWatchdog { * So it can be tested without the queue being emptied immediately as the main public method does. */ public void populateWorkQueueIfNeeded() { - if (moduleSyncWorkQueue.isEmpty() && workQueueLock.tryLock()) { + if (moduleSyncWorkQueue.isEmpty() && cpsAndNcmpLock.tryLock(MODULE_SYNC_WORK_QUEUE_LOCK_NAME)) { + log.info("Lock acquired by thread : {}", Thread.currentThread().getName()); try { populateWorkQueue(); if (moduleSyncWorkQueue.isEmpty()) { setPreviouslyLockedCmHandlesToAdvised(); } } finally { - workQueueLock.unlock(); + cpsAndNcmpLock.unlock(MODULE_SYNC_WORK_QUEUE_LOCK_NAME); + log.info("Lock released by thread : {}", Thread.currentThread().getName()); } } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java index def8f37eb7..d6ac242b30 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java @@ -26,7 +26,6 @@ import com.hazelcast.config.QueueConfig; import com.hazelcast.config.SetConfig; import com.hazelcast.map.IMap; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.locks.Lock; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.impl.cache.HazelcastCacheConfig; import org.springframework.context.annotation.Bean; @@ -48,7 +47,6 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig { private static final MapConfig dataSyncSemaphoresConfig = createMapConfig("dataSyncSemaphoresConfig"); private static final SetConfig moduleSetTagsBeingProcessedConfig = createSetConfig("moduleSetTagsBeingProcessedConfig"); - private static final String LOCK_NAME_FOR_WORK_QUEUE = "workQueueLock"; /** * Module Sync Distributed Queue Instance. @@ -90,21 +88,4 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig { public ISet<String> moduleSetTagsBeingProcessed() { return getOrCreateHazelcastInstance(moduleSetTagsBeingProcessedConfig).getSet("moduleSetTagsBeingProcessed"); } - - /** - * Retrieves a distributed lock used to control access to the work queue for module synchronization. - * This lock ensures that the population and modification of the work queue are thread-safe and - * protected from concurrent access across different nodes in the distributed system. - * The lock guarantees that only one instance of the application can populate or modify the - * module sync work queue at a time, preventing race conditions and potential data inconsistencies. - * The lock is obtained using the Hazelcast CP Subsystem's {@link Lock}, which provides - * strong consistency guarantees for distributed operations. - * - * @return a {@link Lock} instance used for synchronizing access to the work queue. - */ - @Bean - public Lock workQueueLock() { - // TODO Method below does not use commonQueueConfig for creating lock (Refactor later) - return getOrCreateHazelcastInstance(commonQueueConfig).getCPSubsystem().getLock(LOCK_NAME_FOR_WORK_QUEUE); - } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy index 4cf07e4c24..a9b88c2d3b 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022-2023 Nordix Foundation + * Copyright (C) 2022-2024 Nordix Foundation * Modifications Copyright (C) 2022 Bell Canada * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -44,11 +44,11 @@ class ModuleSyncWatchdogSpec extends Specification { def spiedAsyncTaskExecutor = Spy(AsyncTaskExecutor) - def mockWorkQueueLock = Mock(Lock) + def mockCpsAndNcmpLock = Mock(IMap<String,String>) def spiedSleeper = Spy(Sleeper) - def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor, mockWorkQueueLock, spiedSleeper) + def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor, mockCpsAndNcmpLock, spiedSleeper) void setup() { spiedAsyncTaskExecutor.setupThreadPool() @@ -59,14 +59,16 @@ class ModuleSyncWatchdogSpec extends Specification { mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(numberOfAdvisedCmHandles) and: 'module sync utilities returns no failed (locked) cm handles' mockModuleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> [] - and: 'the work queue is not locked' - mockWorkQueueLock.tryLock() >> true + and: 'the work queue can be locked' + mockCpsAndNcmpLock.tryLock('workQueueLock') >> true and: 'the executor has enough available threads' spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 3 when: ' module sync is started' objectUnderTest.moduleSyncAdvisedCmHandles() then: 'it performs #expectedNumberOfTaskExecutions tasks' expectedNumberOfTaskExecutions * spiedAsyncTaskExecutor.executeTask(*_) + and: 'the executing thread is unlocked' + 1 * mockCpsAndNcmpLock.unlock('workQueueLock') where: 'the following parameter are used' scenario | numberOfAdvisedCmHandles || expectedNumberOfTaskExecutions 'none at all' | 0 || 0 @@ -80,8 +82,8 @@ class ModuleSyncWatchdogSpec extends Specification { def 'Module sync cm handles starts with no available threads.'() { given: 'module sync utilities returns a advise cm handles' mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1) - and: 'the work queue is not locked' - mockWorkQueueLock.tryLock() >> true + and: 'the work queue can be locked' + mockCpsAndNcmpLock.tryLock('workQueueLock') >> true and: 'the executor first has no threads but has one thread on the second attempt' spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >>> [ 0, 1 ] when: ' module sync is started' @@ -93,8 +95,8 @@ class ModuleSyncWatchdogSpec extends Specification { def 'Module sync advised cm handle already handled by other thread.'() { given: 'module sync utilities returns an advised cm handle' mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1) - and: 'the work queue is not locked' - mockWorkQueueLock.tryLock() >> true + and: 'the work queue can be locked' + mockCpsAndNcmpLock.tryLock('workQueueLock') >> true and: 'the executor has a thread available' spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 1 and: 'the semaphore cache indicates the cm handle is already being processed' @@ -131,16 +133,18 @@ class ModuleSyncWatchdogSpec extends Specification { def 'Module Sync Locking.'() { given: 'module sync utilities returns an advised cm handle' mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1) - and: 'can lock is : #canLock' - mockWorkQueueLock.tryLock() >> canLock + and: 'can be locked is : #canLock' + mockCpsAndNcmpLock.tryLock('workQueueLock') >> canLock when: 'attempt to populate the work queue' objectUnderTest.populateWorkQueueIfNeeded() then: 'the queue remains empty is #expectQueueRemainsEmpty' assert moduleSyncWorkQueue.isEmpty() == expectQueueRemainsEmpty + and: 'unlock is called only when thread is able to enter the critical section' + expectedInvocationToUnlock * mockCpsAndNcmpLock.unlock('workQueueLock') where: 'the following lock states are applied' - canLock | expectQueueRemainsEmpty - false | true - true | false + canLock || expectQueueRemainsEmpty || expectedInvocationToUnlock + false || true || 0 + true || false || 1 } def 'Sleeper gets interrupted.'() { diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy index 9fc36331e2..16b4460492 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy @@ -22,6 +22,7 @@ package org.onap.cps.integration.base import com.hazelcast.collection.ISet +import com.hazelcast.map.IMap import okhttp3.mockwebserver.MockWebServer import org.onap.cps.api.CpsAnchorService import org.onap.cps.api.CpsDataService @@ -123,6 +124,9 @@ abstract class CpsIntegrationSpecBase extends Specification { BlockingQueue<String> moduleSyncWorkQueue @Autowired + IMap<String, String> cpsAndNcmpLock + + @Autowired JsonObjectMapper jsonObjectMapper @Autowired diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy index 43bcbdb4f4..a6e56ab22d 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy @@ -151,6 +151,15 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase { } } + def populateQueueWithoutDelayCallable = () -> { + try { + objectUnderTest.populateWorkQueueIfNeeded() + return 'task acquired the lock first' + } catch (InterruptedException e) { + e.printStackTrace() + } + } + def populateQueueWithDelay = () -> { try { Thread.sleep(10) |