diff options
Diffstat (limited to 'cps-ncmp-service/src')
2 files changed, 50 insertions, 13 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java index e2330a7135..6ff426d66f 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java @@ -65,16 +65,17 @@ public class ModuleSyncWatchdog { public void moduleSyncAdvisedCmHandles() { log.info("Processing module sync watchdog waking up."); populateWorkQueueIfNeeded(); - final int asyncTaskParallelismLevel = asyncTaskExecutor.getAsyncTaskParallelismLevel(); while (!moduleSyncWorkQueue.isEmpty()) { - if (batchCounter.get() <= asyncTaskParallelismLevel) { + if (batchCounter.get() <= asyncTaskExecutor.getAsyncTaskParallelismLevel()) { final Collection<DataNode> nextBatch = prepareNextBatch(); log.info("Processing module sync batch of {}. {} batch(es) active.", - nextBatch.size(), batchCounter.get()); - asyncTaskExecutor.executeTask(() -> - moduleSyncTasks.performModuleSync(nextBatch, batchCounter), + nextBatch.size(), batchCounter.get()); + if (!nextBatch.isEmpty()) { + asyncTaskExecutor.executeTask(() -> + moduleSyncTasks.performModuleSync(nextBatch, batchCounter), ASYNC_TASK_TIMEOUT_IN_MILLISECONDS); - batchCounter.getAndIncrement(); + batchCounter.getAndIncrement(); + } } else { preventBusyWait(); } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy index a8bbf7c483..bb730fc34d 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022 Nordix Foundation + * Copyright (C) 2022-2023 Nordix Foundation * Modifications Copyright (C) 2022 Bell Canada * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -36,17 +36,16 @@ class ModuleSyncWatchdogSpec extends Specification { def moduleSyncWorkQueue = new ArrayBlockingQueue(testQueueCapacity) - def stubModuleSyncStartedOnCmHandles = Stub(IMap<String, Object>) + def mockModuleSyncStartedOnCmHandles = Mock(IMap<String, Object>) def mockModuleSyncTasks = Mock(ModuleSyncTasks) def spiedAsyncTaskExecutor = Spy(AsyncTaskExecutor) - def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , stubModuleSyncStartedOnCmHandles, - mockModuleSyncTasks, spiedAsyncTaskExecutor) + def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor) void setup() { - spiedAsyncTaskExecutor.setupThreadPool(); + spiedAsyncTaskExecutor.setupThreadPool() } def 'Module sync advised cm handles with #scenario.'() { @@ -58,7 +57,7 @@ class ModuleSyncWatchdogSpec extends Specification { objectUnderTest.moduleSyncAdvisedCmHandles() then: 'it performs #expectedNumberOfTaskExecutions tasks' expectedNumberOfTaskExecutions * spiedAsyncTaskExecutor.executeTask(*_) - where: ' the following parameter are used' + where: 'the following parameter are used' scenario | numberOfAdvisedCmHandles || expectedNumberOfTaskExecutions 'less then 1 batch' | 1 || 1 'exactly 1 batch' | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 1 @@ -67,11 +66,48 @@ class ModuleSyncWatchdogSpec extends Specification { 'over queue capacity' | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3 } + def 'Module sync advised cm handles starts with no available threads.'() { + given: 'sync utilities returns a advise cm handles' + mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(1) + and: 'the executor first has no threads but has one thread on the second attempt' + spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >>> [ 0, 1 ] + when: ' module sync is started' + objectUnderTest.moduleSyncAdvisedCmHandles() + then: 'it performs one task' + 1 * spiedAsyncTaskExecutor.executeTask(*_) + } + + def 'Module sync advised cm handles already handled.'() { + given: 'sync utilities returns a advise cm handles' + mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(1) + and: 'the executor has a thread available' + spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 1 + and: 'the semaphore cache indicates the cm handle is already being processed' + mockModuleSyncStartedOnCmHandles.putIfAbsent(*_) >> 'Started' + when: ' module sync is started' + objectUnderTest.moduleSyncAdvisedCmHandles() + then: 'it does NOT execute a task to process the (empty) batch' + 0 * spiedAsyncTaskExecutor.executeTask(*_) + } + + def 'Module sync with previous cm handle(s) left in work queue.'() { + given: 'there is still a cm handle in the queue' + moduleSyncWorkQueue.offer(new DataNode()) + and: 'sync utilities returns many advise cm handles' + mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(500) + and: 'the executor has plenty threads available' + spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 10 + when: ' module sync is started' + objectUnderTest.moduleSyncAdvisedCmHandles() + then: 'it does executes only one task to process the remaining handle in the queue' + 1 * spiedAsyncTaskExecutor.executeTask(*_) + } + def 'Reset failed cm handles.'() { given: 'sync utilities returns failed cm handles' def failedCmHandles = [new YangModelCmHandle()] mockSyncUtils.getModuleSyncFailedCmHandles() >> failedCmHandles - when: ' reset failed cm handles is started' + when: 'reset failed cm handles is started' objectUnderTest.resetPreviouslyFailedCmHandles() then: 'it is delegated to the module sync task (service)' 1 * mockModuleSyncTasks.resetFailedCmHandles(failedCmHandles) |