aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/CpsAndNcmpLockConfig.java48
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java11
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java19
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy32
-rw-r--r--integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy4
-rw-r--r--integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy9
6 files changed, 86 insertions, 37 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/CpsAndNcmpLockConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/CpsAndNcmpLockConfig.java
new file mode 100644
index 0000000000..61cf939a51
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/CpsAndNcmpLockConfig.java
@@ -0,0 +1,48 @@
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cache;
+
+import com.hazelcast.config.MapConfig;
+import com.hazelcast.map.IMap;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class CpsAndNcmpLockConfig extends HazelcastCacheConfig {
+
+ // Lock names for different use cases ( to be used as cpsAndNcmpLock keys)
+ public static final String MODULE_SYNC_WORK_QUEUE_LOCK_NAME = "workQueueLock";
+
+ private static final MapConfig cpsAndNcmpLockMapConfig = createMapConfig("cpsAndNcmpLockConfig");
+
+ /**
+ * Distributed instance used for locking purpose for various use cases in cps-and-ncmp.
+ * The key of the map entry is name of the lock and should be based on the use case we are locking.
+ *
+ * @return configured map of lock object to have distributed coordination.
+ */
+ @Bean
+ public IMap<String, String> cpsAndNcmpLock() {
+ return getOrCreateHazelcastInstance(cpsAndNcmpLockMapConfig).getMap("cpsAndNcmpLock");
+ }
+
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
index 3f2bb4f4ef..5b71a8af70 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
@@ -21,13 +21,14 @@
package org.onap.cps.ncmp.impl.inventory.sync;
+import static org.onap.cps.ncmp.impl.cache.CpsAndNcmpLockConfig.MODULE_SYNC_WORK_QUEUE_LOCK_NAME;
+
import com.hazelcast.map.IMap;
import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -46,7 +47,7 @@ public class ModuleSyncWatchdog {
private final IMap<String, Object> moduleSyncStartedOnCmHandles;
private final ModuleSyncTasks moduleSyncTasks;
private final AsyncTaskExecutor asyncTaskExecutor;
- private final Lock workQueueLock;
+ private final IMap<String, String> cpsAndNcmpLock;
private final Sleeper sleeper;
private static final int MODULE_SYNC_BATCH_SIZE = 100;
@@ -90,14 +91,16 @@ public class ModuleSyncWatchdog {
* So it can be tested without the queue being emptied immediately as the main public method does.
*/
public void populateWorkQueueIfNeeded() {
- if (moduleSyncWorkQueue.isEmpty() && workQueueLock.tryLock()) {
+ if (moduleSyncWorkQueue.isEmpty() && cpsAndNcmpLock.tryLock(MODULE_SYNC_WORK_QUEUE_LOCK_NAME)) {
+ log.info("Lock acquired by thread : {}", Thread.currentThread().getName());
try {
populateWorkQueue();
if (moduleSyncWorkQueue.isEmpty()) {
setPreviouslyLockedCmHandlesToAdvised();
}
} finally {
- workQueueLock.unlock();
+ cpsAndNcmpLock.unlock(MODULE_SYNC_WORK_QUEUE_LOCK_NAME);
+ log.info("Lock released by thread : {}", Thread.currentThread().getName());
}
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java
index def8f37eb7..d6ac242b30 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java
@@ -26,7 +26,6 @@ import com.hazelcast.config.QueueConfig;
import com.hazelcast.config.SetConfig;
import com.hazelcast.map.IMap;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.locks.Lock;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.impl.cache.HazelcastCacheConfig;
import org.springframework.context.annotation.Bean;
@@ -48,7 +47,6 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig {
private static final MapConfig dataSyncSemaphoresConfig = createMapConfig("dataSyncSemaphoresConfig");
private static final SetConfig moduleSetTagsBeingProcessedConfig
= createSetConfig("moduleSetTagsBeingProcessedConfig");
- private static final String LOCK_NAME_FOR_WORK_QUEUE = "workQueueLock";
/**
* Module Sync Distributed Queue Instance.
@@ -90,21 +88,4 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig {
public ISet<String> moduleSetTagsBeingProcessed() {
return getOrCreateHazelcastInstance(moduleSetTagsBeingProcessedConfig).getSet("moduleSetTagsBeingProcessed");
}
-
- /**
- * Retrieves a distributed lock used to control access to the work queue for module synchronization.
- * This lock ensures that the population and modification of the work queue are thread-safe and
- * protected from concurrent access across different nodes in the distributed system.
- * The lock guarantees that only one instance of the application can populate or modify the
- * module sync work queue at a time, preventing race conditions and potential data inconsistencies.
- * The lock is obtained using the Hazelcast CP Subsystem's {@link Lock}, which provides
- * strong consistency guarantees for distributed operations.
- *
- * @return a {@link Lock} instance used for synchronizing access to the work queue.
- */
- @Bean
- public Lock workQueueLock() {
- // TODO Method below does not use commonQueueConfig for creating lock (Refactor later)
- return getOrCreateHazelcastInstance(commonQueueConfig).getCPSubsystem().getLock(LOCK_NAME_FOR_WORK_QUEUE);
- }
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
index 4cf07e4c24..a9b88c2d3b 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022-2023 Nordix Foundation
+ * Copyright (C) 2022-2024 Nordix Foundation
* Modifications Copyright (C) 2022 Bell Canada
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -44,11 +44,11 @@ class ModuleSyncWatchdogSpec extends Specification {
def spiedAsyncTaskExecutor = Spy(AsyncTaskExecutor)
- def mockWorkQueueLock = Mock(Lock)
+ def mockCpsAndNcmpLock = Mock(IMap<String,String>)
def spiedSleeper = Spy(Sleeper)
- def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor, mockWorkQueueLock, spiedSleeper)
+ def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor, mockCpsAndNcmpLock, spiedSleeper)
void setup() {
spiedAsyncTaskExecutor.setupThreadPool()
@@ -59,14 +59,16 @@ class ModuleSyncWatchdogSpec extends Specification {
mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(numberOfAdvisedCmHandles)
and: 'module sync utilities returns no failed (locked) cm handles'
mockModuleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> []
- and: 'the work queue is not locked'
- mockWorkQueueLock.tryLock() >> true
+ and: 'the work queue can be locked'
+ mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
and: 'the executor has enough available threads'
spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 3
when: ' module sync is started'
objectUnderTest.moduleSyncAdvisedCmHandles()
then: 'it performs #expectedNumberOfTaskExecutions tasks'
expectedNumberOfTaskExecutions * spiedAsyncTaskExecutor.executeTask(*_)
+ and: 'the executing thread is unlocked'
+ 1 * mockCpsAndNcmpLock.unlock('workQueueLock')
where: 'the following parameter are used'
scenario | numberOfAdvisedCmHandles || expectedNumberOfTaskExecutions
'none at all' | 0 || 0
@@ -80,8 +82,8 @@ class ModuleSyncWatchdogSpec extends Specification {
def 'Module sync cm handles starts with no available threads.'() {
given: 'module sync utilities returns a advise cm handles'
mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
- and: 'the work queue is not locked'
- mockWorkQueueLock.tryLock() >> true
+ and: 'the work queue can be locked'
+ mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
and: 'the executor first has no threads but has one thread on the second attempt'
spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >>> [ 0, 1 ]
when: ' module sync is started'
@@ -93,8 +95,8 @@ class ModuleSyncWatchdogSpec extends Specification {
def 'Module sync advised cm handle already handled by other thread.'() {
given: 'module sync utilities returns an advised cm handle'
mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
- and: 'the work queue is not locked'
- mockWorkQueueLock.tryLock() >> true
+ and: 'the work queue can be locked'
+ mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
and: 'the executor has a thread available'
spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 1
and: 'the semaphore cache indicates the cm handle is already being processed'
@@ -131,16 +133,18 @@ class ModuleSyncWatchdogSpec extends Specification {
def 'Module Sync Locking.'() {
given: 'module sync utilities returns an advised cm handle'
mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
- and: 'can lock is : #canLock'
- mockWorkQueueLock.tryLock() >> canLock
+ and: 'can be locked is : #canLock'
+ mockCpsAndNcmpLock.tryLock('workQueueLock') >> canLock
when: 'attempt to populate the work queue'
objectUnderTest.populateWorkQueueIfNeeded()
then: 'the queue remains empty is #expectQueueRemainsEmpty'
assert moduleSyncWorkQueue.isEmpty() == expectQueueRemainsEmpty
+ and: 'unlock is called only when thread is able to enter the critical section'
+ expectedInvocationToUnlock * mockCpsAndNcmpLock.unlock('workQueueLock')
where: 'the following lock states are applied'
- canLock | expectQueueRemainsEmpty
- false | true
- true | false
+ canLock || expectQueueRemainsEmpty || expectedInvocationToUnlock
+ false || true || 0
+ true || false || 1
}
def 'Sleeper gets interrupted.'() {
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy
index 9fc36331e2..16b4460492 100644
--- a/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy
@@ -22,6 +22,7 @@
package org.onap.cps.integration.base
import com.hazelcast.collection.ISet
+import com.hazelcast.map.IMap
import okhttp3.mockwebserver.MockWebServer
import org.onap.cps.api.CpsAnchorService
import org.onap.cps.api.CpsDataService
@@ -123,6 +124,9 @@ abstract class CpsIntegrationSpecBase extends Specification {
BlockingQueue<String> moduleSyncWorkQueue
@Autowired
+ IMap<String, String> cpsAndNcmpLock
+
+ @Autowired
JsonObjectMapper jsonObjectMapper
@Autowired
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy
index 43bcbdb4f4..a6e56ab22d 100644
--- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy
@@ -151,6 +151,15 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
}
}
+ def populateQueueWithoutDelayCallable = () -> {
+ try {
+ objectUnderTest.populateWorkQueueIfNeeded()
+ return 'task acquired the lock first'
+ } catch (InterruptedException e) {
+ e.printStackTrace()
+ }
+ }
+
def populateQueueWithDelay = () -> {
try {
Thread.sleep(10)