aboutsummaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java21
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java66
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java19
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy2
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy10
5 files changed, 81 insertions, 37 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
index c6deb79d4d..e627f8f894 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
@@ -95,22 +95,21 @@ public class ModuleSyncTasks {
}
/**
- * Resets the state of failed CM handles and updates their status to ADVISED for retry.
-
- * This method processes a collection of failed CM handles, logs their lock reason, and resets their state
+ * Set the state of CM handles to ADVISED.
+ * This method processes a collection of CM handles, logs their lock reason, and resets their state
* to ADVISED. Once reset, it updates the CM handle states in a batch to allow for re-attempt by the module-sync
* watchdog.
*
- * @param failedCmHandles a collection of CM handles that have failed and need their state reset
+ * @param yangModelCmHandles a collection of CM handles that needs their state reset
*/
- public void resetFailedCmHandles(final Collection<YangModelCmHandle> failedCmHandles) {
- final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(failedCmHandles.size());
- for (final YangModelCmHandle failedCmHandle : failedCmHandles) {
- final CompositeState compositeState = failedCmHandle.getCompositeState();
- final String resetCmHandleId = failedCmHandle.getId();
+ public void setCmHandlesToAdvised(final Collection<YangModelCmHandle> yangModelCmHandles) {
+ final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(yangModelCmHandles.size());
+ for (final YangModelCmHandle yangModelCmHandle : yangModelCmHandles) {
+ final CompositeState compositeState = yangModelCmHandle.getCompositeState();
+ final String resetCmHandleId = yangModelCmHandle.getId();
log.debug("Resetting CM handle {} state to ADVISED for retry by the module-sync watchdog. Lock reason: {}",
- failedCmHandle.getId(), compositeState.getLockReason().getLockReasonCategory().name());
- cmHandleStatePerCmHandle.put(failedCmHandle, CmHandleState.ADVISED);
+ yangModelCmHandle.getId(), compositeState.getLockReason().getLockReasonCategory().name());
+ cmHandleStatePerCmHandle.put(yangModelCmHandle, CmHandleState.ADVISED);
removeResetCmHandleFromModuleSyncMap(resetCmHandleId);
}
lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandleStatePerCmHandle);
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
index bc7d6cdf67..4061298cd0 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -45,6 +46,8 @@ public class ModuleSyncWatchdog {
private final IMap<String, Object> moduleSyncStartedOnCmHandles;
private final ModuleSyncTasks moduleSyncTasks;
private final AsyncTaskExecutor asyncTaskExecutor;
+ private final Lock workQueueLock;
+
private static final int MODULE_SYNC_BATCH_SIZE = 100;
private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10;
private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started";
@@ -60,7 +63,7 @@ public class ModuleSyncWatchdog {
*/
@Scheduled(fixedDelayString = "${ncmp.timers.advised-modules-sync.sleep-time-ms:5000}")
public void moduleSyncAdvisedCmHandles() {
- log.info("Processing module sync watchdog waking up.");
+ log.debug("Processing module sync watchdog waking up.");
populateWorkQueueIfNeeded();
while (!moduleSyncWorkQueue.isEmpty()) {
if (batchCounter.get() <= asyncTaskExecutor.getAsyncTaskParallelismLevel()) {
@@ -79,21 +82,9 @@ public class ModuleSyncWatchdog {
}
}
- /**
- * Find any failed (locked) cm handles and change state back to 'ADVISED'.
- */
- @Scheduled(fixedDelayString = "${ncmp.timers.locked-modules-sync.sleep-time-ms:15000}")
- public void resetPreviouslyFailedCmHandles() {
- log.info("Processing module sync retry-watchdog waking up.");
- final Collection<YangModelCmHandle> failedCmHandles
- = moduleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade();
- log.info("Retrying {} cmHandles", failedCmHandles.size());
- moduleSyncTasks.resetFailedCmHandles(failedCmHandles);
- }
-
private void preventBusyWait() {
try {
- log.info("Busy waiting now");
+ log.debug("Busy waiting now");
TimeUnit.MILLISECONDS.sleep(PREVENT_CPU_BURN_WAIT_TIME_MILLIS);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
@@ -102,14 +93,46 @@ public class ModuleSyncWatchdog {
private void populateWorkQueueIfNeeded() {
if (moduleSyncWorkQueue.isEmpty()) {
- final Collection<DataNode> advisedCmHandles = moduleOperationsUtils.getAdvisedCmHandles();
- log.info("Processing module sync fetched {} advised cm handles from DB", advisedCmHandles.size());
- for (final DataNode advisedCmHandle : advisedCmHandles) {
- if (!moduleSyncWorkQueue.offer(advisedCmHandle)) {
- log.warn("Unable to add cm handle {} to the work queue", advisedCmHandle.getLeaves().get("id"));
+ if (workQueueLock.tryLock()) {
+ try {
+ populateWorkQueue();
+ if (moduleSyncWorkQueue.isEmpty()) {
+ setPreviouslyLockedCmHandlesToAdvised();
+ }
+ } finally {
+ workQueueLock.unlock();
}
}
- log.info("Work Queue Size : {}", moduleSyncWorkQueue.size());
+ }
+ }
+
+ private void populateWorkQueue() {
+ final Collection<DataNode> advisedCmHandles = moduleOperationsUtils.getAdvisedCmHandles();
+ if (advisedCmHandles.isEmpty()) {
+ log.debug("No advised CM handles found in DB.");
+ } else {
+ log.info("Fetched {} advised CM handles from DB. Adding them to the work queue.", advisedCmHandles.size());
+ advisedCmHandles.forEach(advisedCmHandle -> {
+ final String cmHandleId = String.valueOf(advisedCmHandle.getLeaves().get("id"));
+ if (moduleSyncWorkQueue.offer(advisedCmHandle)) {
+ log.info("CM handle {} added to the work queue.", cmHandleId);
+ } else {
+ log.warn("Failed to add CM handle {} to the work queue.", cmHandleId);
+ }
+ });
+ log.info("Work queue contains {} items.", moduleSyncWorkQueue.size());
+ }
+ }
+
+ private void setPreviouslyLockedCmHandlesToAdvised() {
+ final Collection<YangModelCmHandle> lockedCmHandles
+ = moduleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade();
+ if (lockedCmHandles.isEmpty()) {
+ log.debug("No locked CM handles found in DB.");
+ } else {
+ log.info("Found {} Locked CM Handles. Changing state to Advise to retry syncing them again.",
+ lockedCmHandles.size());
+ moduleSyncTasks.setCmHandlesToAdvised(lockedCmHandles);
}
}
@@ -130,8 +153,7 @@ public class ModuleSyncWatchdog {
nextBatch.add(batchCandidate);
}
}
- log.debug("nextBatch size : {}", nextBatch.size());
+ log.info("nextBatch size : {}", nextBatch.size());
return nextBatch;
}
-
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java
index c5fae0d166..1f33cc349d 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java
@@ -24,6 +24,7 @@ import com.hazelcast.config.MapConfig;
import com.hazelcast.config.QueueConfig;
import com.hazelcast.map.IMap;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.locks.Lock;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.impl.cache.HazelcastCacheConfig;
import org.onap.cps.spi.model.DataNode;
@@ -43,6 +44,7 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig {
private static final QueueConfig commonQueueConfig = createQueueConfig("defaultQueueConfig");
private static final MapConfig moduleSyncStartedConfig = createMapConfig("moduleSyncStartedConfig");
private static final MapConfig dataSyncSemaphoresConfig = createMapConfig("dataSyncSemaphoresConfig");
+ private static final String LOCK_NAME_FOR_WORK_QUEUE = "workQueueLock";
/**
* Module Sync Distributed Queue Instance.
@@ -74,4 +76,21 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig {
public IMap<String, Boolean> dataSyncSemaphores() {
return getOrCreateHazelcastInstance(dataSyncSemaphoresConfig).getMap("dataSyncSemaphores");
}
+
+ /**
+ * Retrieves a distributed lock used to control access to the work queue for module synchronization.
+ * This lock ensures that the population and modification of the work queue are thread-safe and
+ * protected from concurrent access across different nodes in the distributed system.
+ * The lock guarantees that only one instance of the application can populate or modify the
+ * module sync work queue at a time, preventing race conditions and potential data inconsistencies.
+ * The lock is obtained using the Hazelcast CP Subsystem's {@link Lock}, which provides
+ * strong consistency guarantees for distributed operations.
+ *
+ * @return a {@link Lock} instance used for synchronizing access to the work queue.
+ */
+ @Bean
+ public Lock workQueueLock() {
+ // TODO Method below does not use commonQueueConfig for creating lock (Refactor later)
+ return getOrCreateHazelcastInstance(commonQueueConfig).getCPSubsystem().getLock(LOCK_NAME_FOR_WORK_QUEUE);
+ }
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy
index 160744a7d7..4d715d28c9 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy
@@ -136,7 +136,7 @@ class ModuleSyncTasksSpec extends Specification {
moduleSyncStartedOnCmHandles.put('cm-handle-1', 'started')
moduleSyncStartedOnCmHandles.put('cm-handle-2', 'started')
when: 'resetting failed cm handles'
- objectUnderTest.resetFailedCmHandles([yangModelCmHandle1, yangModelCmHandle2])
+ objectUnderTest.setCmHandlesToAdvised([yangModelCmHandle1, yangModelCmHandle2])
then: 'updated to state "ADVISED" from "READY" is called as often as there are cm handles ready for retry'
1 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch(expectedCmHandleStatePerCmHandle)
and: 'after reset performed progress map is empty'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
index 155edc8bc6..3064a78ff9 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
@@ -27,6 +27,7 @@ import org.onap.cps.spi.model.DataNode
import spock.lang.Specification
import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.locks.Lock
class ModuleSyncWatchdogSpec extends Specification {
@@ -42,10 +43,13 @@ class ModuleSyncWatchdogSpec extends Specification {
def spiedAsyncTaskExecutor = Spy(AsyncTaskExecutor)
- def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor)
+ def mockWorkQueueLock = Mock(Lock)
+
+ def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor, mockWorkQueueLock)
void setup() {
spiedAsyncTaskExecutor.setupThreadPool()
+ mockWorkQueueLock.tryLock() >> true
}
def 'Module sync advised cm handles with #scenario.'() {
@@ -108,9 +112,9 @@ class ModuleSyncWatchdogSpec extends Specification {
def failedCmHandles = [new YangModelCmHandle()]
mockSyncUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> failedCmHandles
when: 'reset failed cm handles is started'
- objectUnderTest.resetPreviouslyFailedCmHandles()
+ objectUnderTest.setPreviouslyLockedCmHandlesToAdvised()
then: 'it is delegated to the module sync task (service)'
- 1 * mockModuleSyncTasks.resetFailedCmHandles(failedCmHandles)
+ 1 * mockModuleSyncTasks.setCmHandlesToAdvised(failedCmHandles)
}
def createDataNodes(numberOfDataNodes) {