diff options
Diffstat (limited to 'cps-ncmp-service/src')
5 files changed, 81 insertions, 37 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java index c6deb79d4d..e627f8f894 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java @@ -95,22 +95,21 @@ public class ModuleSyncTasks { } /** - * Resets the state of failed CM handles and updates their status to ADVISED for retry. - - * This method processes a collection of failed CM handles, logs their lock reason, and resets their state + * Set the state of CM handles to ADVISED. + * This method processes a collection of CM handles, logs their lock reason, and resets their state * to ADVISED. Once reset, it updates the CM handle states in a batch to allow for re-attempt by the module-sync * watchdog. * - * @param failedCmHandles a collection of CM handles that have failed and need their state reset + * @param yangModelCmHandles a collection of CM handles that needs their state reset */ - public void resetFailedCmHandles(final Collection<YangModelCmHandle> failedCmHandles) { - final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(failedCmHandles.size()); - for (final YangModelCmHandle failedCmHandle : failedCmHandles) { - final CompositeState compositeState = failedCmHandle.getCompositeState(); - final String resetCmHandleId = failedCmHandle.getId(); + public void setCmHandlesToAdvised(final Collection<YangModelCmHandle> yangModelCmHandles) { + final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(yangModelCmHandles.size()); + for (final YangModelCmHandle yangModelCmHandle : yangModelCmHandles) { + final CompositeState compositeState = yangModelCmHandle.getCompositeState(); + final String resetCmHandleId = yangModelCmHandle.getId(); log.debug("Resetting CM handle {} state to ADVISED for retry by the module-sync watchdog. Lock reason: {}", - failedCmHandle.getId(), compositeState.getLockReason().getLockReasonCategory().name()); - cmHandleStatePerCmHandle.put(failedCmHandle, CmHandleState.ADVISED); + yangModelCmHandle.getId(), compositeState.getLockReason().getLockReasonCategory().name()); + cmHandleStatePerCmHandle.put(yangModelCmHandle, CmHandleState.ADVISED); removeResetCmHandleFromModuleSyncMap(resetCmHandleId); } lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandleStatePerCmHandle); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java index bc7d6cdf67..4061298cd0 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -45,6 +46,8 @@ public class ModuleSyncWatchdog { private final IMap<String, Object> moduleSyncStartedOnCmHandles; private final ModuleSyncTasks moduleSyncTasks; private final AsyncTaskExecutor asyncTaskExecutor; + private final Lock workQueueLock; + private static final int MODULE_SYNC_BATCH_SIZE = 100; private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10; private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started"; @@ -60,7 +63,7 @@ public class ModuleSyncWatchdog { */ @Scheduled(fixedDelayString = "${ncmp.timers.advised-modules-sync.sleep-time-ms:5000}") public void moduleSyncAdvisedCmHandles() { - log.info("Processing module sync watchdog waking up."); + log.debug("Processing module sync watchdog waking up."); populateWorkQueueIfNeeded(); while (!moduleSyncWorkQueue.isEmpty()) { if (batchCounter.get() <= asyncTaskExecutor.getAsyncTaskParallelismLevel()) { @@ -79,21 +82,9 @@ public class ModuleSyncWatchdog { } } - /** - * Find any failed (locked) cm handles and change state back to 'ADVISED'. - */ - @Scheduled(fixedDelayString = "${ncmp.timers.locked-modules-sync.sleep-time-ms:15000}") - public void resetPreviouslyFailedCmHandles() { - log.info("Processing module sync retry-watchdog waking up."); - final Collection<YangModelCmHandle> failedCmHandles - = moduleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade(); - log.info("Retrying {} cmHandles", failedCmHandles.size()); - moduleSyncTasks.resetFailedCmHandles(failedCmHandles); - } - private void preventBusyWait() { try { - log.info("Busy waiting now"); + log.debug("Busy waiting now"); TimeUnit.MILLISECONDS.sleep(PREVENT_CPU_BURN_WAIT_TIME_MILLIS); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); @@ -102,14 +93,46 @@ public class ModuleSyncWatchdog { private void populateWorkQueueIfNeeded() { if (moduleSyncWorkQueue.isEmpty()) { - final Collection<DataNode> advisedCmHandles = moduleOperationsUtils.getAdvisedCmHandles(); - log.info("Processing module sync fetched {} advised cm handles from DB", advisedCmHandles.size()); - for (final DataNode advisedCmHandle : advisedCmHandles) { - if (!moduleSyncWorkQueue.offer(advisedCmHandle)) { - log.warn("Unable to add cm handle {} to the work queue", advisedCmHandle.getLeaves().get("id")); + if (workQueueLock.tryLock()) { + try { + populateWorkQueue(); + if (moduleSyncWorkQueue.isEmpty()) { + setPreviouslyLockedCmHandlesToAdvised(); + } + } finally { + workQueueLock.unlock(); } } - log.info("Work Queue Size : {}", moduleSyncWorkQueue.size()); + } + } + + private void populateWorkQueue() { + final Collection<DataNode> advisedCmHandles = moduleOperationsUtils.getAdvisedCmHandles(); + if (advisedCmHandles.isEmpty()) { + log.debug("No advised CM handles found in DB."); + } else { + log.info("Fetched {} advised CM handles from DB. Adding them to the work queue.", advisedCmHandles.size()); + advisedCmHandles.forEach(advisedCmHandle -> { + final String cmHandleId = String.valueOf(advisedCmHandle.getLeaves().get("id")); + if (moduleSyncWorkQueue.offer(advisedCmHandle)) { + log.info("CM handle {} added to the work queue.", cmHandleId); + } else { + log.warn("Failed to add CM handle {} to the work queue.", cmHandleId); + } + }); + log.info("Work queue contains {} items.", moduleSyncWorkQueue.size()); + } + } + + private void setPreviouslyLockedCmHandlesToAdvised() { + final Collection<YangModelCmHandle> lockedCmHandles + = moduleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade(); + if (lockedCmHandles.isEmpty()) { + log.debug("No locked CM handles found in DB."); + } else { + log.info("Found {} Locked CM Handles. Changing state to Advise to retry syncing them again.", + lockedCmHandles.size()); + moduleSyncTasks.setCmHandlesToAdvised(lockedCmHandles); } } @@ -130,8 +153,7 @@ public class ModuleSyncWatchdog { nextBatch.add(batchCandidate); } } - log.debug("nextBatch size : {}", nextBatch.size()); + log.info("nextBatch size : {}", nextBatch.size()); return nextBatch; } - } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java index c5fae0d166..1f33cc349d 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java @@ -24,6 +24,7 @@ import com.hazelcast.config.MapConfig; import com.hazelcast.config.QueueConfig; import com.hazelcast.map.IMap; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.locks.Lock; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.impl.cache.HazelcastCacheConfig; import org.onap.cps.spi.model.DataNode; @@ -43,6 +44,7 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig { private static final QueueConfig commonQueueConfig = createQueueConfig("defaultQueueConfig"); private static final MapConfig moduleSyncStartedConfig = createMapConfig("moduleSyncStartedConfig"); private static final MapConfig dataSyncSemaphoresConfig = createMapConfig("dataSyncSemaphoresConfig"); + private static final String LOCK_NAME_FOR_WORK_QUEUE = "workQueueLock"; /** * Module Sync Distributed Queue Instance. @@ -74,4 +76,21 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig { public IMap<String, Boolean> dataSyncSemaphores() { return getOrCreateHazelcastInstance(dataSyncSemaphoresConfig).getMap("dataSyncSemaphores"); } + + /** + * Retrieves a distributed lock used to control access to the work queue for module synchronization. + * This lock ensures that the population and modification of the work queue are thread-safe and + * protected from concurrent access across different nodes in the distributed system. + * The lock guarantees that only one instance of the application can populate or modify the + * module sync work queue at a time, preventing race conditions and potential data inconsistencies. + * The lock is obtained using the Hazelcast CP Subsystem's {@link Lock}, which provides + * strong consistency guarantees for distributed operations. + * + * @return a {@link Lock} instance used for synchronizing access to the work queue. + */ + @Bean + public Lock workQueueLock() { + // TODO Method below does not use commonQueueConfig for creating lock (Refactor later) + return getOrCreateHazelcastInstance(commonQueueConfig).getCPSubsystem().getLock(LOCK_NAME_FOR_WORK_QUEUE); + } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy index 160744a7d7..4d715d28c9 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy @@ -136,7 +136,7 @@ class ModuleSyncTasksSpec extends Specification { moduleSyncStartedOnCmHandles.put('cm-handle-1', 'started') moduleSyncStartedOnCmHandles.put('cm-handle-2', 'started') when: 'resetting failed cm handles' - objectUnderTest.resetFailedCmHandles([yangModelCmHandle1, yangModelCmHandle2]) + objectUnderTest.setCmHandlesToAdvised([yangModelCmHandle1, yangModelCmHandle2]) then: 'updated to state "ADVISED" from "READY" is called as often as there are cm handles ready for retry' 1 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch(expectedCmHandleStatePerCmHandle) and: 'after reset performed progress map is empty' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy index 155edc8bc6..3064a78ff9 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy @@ -27,6 +27,7 @@ import org.onap.cps.spi.model.DataNode import spock.lang.Specification import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.locks.Lock class ModuleSyncWatchdogSpec extends Specification { @@ -42,10 +43,13 @@ class ModuleSyncWatchdogSpec extends Specification { def spiedAsyncTaskExecutor = Spy(AsyncTaskExecutor) - def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor) + def mockWorkQueueLock = Mock(Lock) + + def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor, mockWorkQueueLock) void setup() { spiedAsyncTaskExecutor.setupThreadPool() + mockWorkQueueLock.tryLock() >> true } def 'Module sync advised cm handles with #scenario.'() { @@ -108,9 +112,9 @@ class ModuleSyncWatchdogSpec extends Specification { def failedCmHandles = [new YangModelCmHandle()] mockSyncUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> failedCmHandles when: 'reset failed cm handles is started' - objectUnderTest.resetPreviouslyFailedCmHandles() + objectUnderTest.setPreviouslyLockedCmHandlesToAdvised() then: 'it is delegated to the module sync task (service)' - 1 * mockModuleSyncTasks.resetFailedCmHandles(failedCmHandles) + 1 * mockModuleSyncTasks.setCmHandlesToAdvised(failedCmHandles) } def createDataNodes(numberOfDataNodes) { |