aboutsummaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java77
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java11
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java38
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java35
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutorSpec.groovy63
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy18
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy49
-rw-r--r--cps-ncmp-service/src/test/resources/application.yml4
8 files changed, 21 insertions, 274 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java
deleted file mode 100644
index 80bc4ab69f..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2022-2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.inventory.sync;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-import jakarta.annotation.PostConstruct;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Supplier;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Service;
-
-@Slf4j
-@Service
-public class AsyncTaskExecutor {
-
- @Value("${ncmp.modules-sync-watchdog.async-executor.parallelism-level:10}")
- @Getter
- private int asyncTaskParallelismLevel;
- private ExecutorService executorService;
- private static final int DEFAULT_PARALLELISM_LEVEL = 10;
-
- /**
- * Set up executor service with thread-pool size as per configuration parameter.
- * If modules-sync-watchdog.async-executor.parallelism-level not set a default of 10 threads will be applied.
- */
- @PostConstruct
- public void setupThreadPool() {
- executorService = Executors.newWorkStealingPool(
- asyncTaskParallelismLevel == 0 ? DEFAULT_PARALLELISM_LEVEL : asyncTaskParallelismLevel);
- }
-
- /**
- * Execute supplied task asynchronously.
- *
- * @param taskSupplier functional method is get() task need to executed asynchronously
- * @param timeOutInMillis the task timeout value in milliseconds
- */
- public void executeTask(final Supplier<Object> taskSupplier, final long timeOutInMillis) {
- CompletableFuture.supplyAsync(taskSupplier::get, executorService)
- .orTimeout(timeOutInMillis, MILLISECONDS)
- .whenCompleteAsync(this::handleTaskCompletion);
- }
-
- private void handleTaskCompletion(final Object response, final Throwable throwable) {
- if (throwable != null) {
- if (throwable instanceof TimeoutException) {
- log.error("Async task didn't complete within the required time.", throwable);
- } else {
- log.error("Watchdog async batch failed.", throwable);
- }
- }
- }
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
index f039cf3c02..b727e79e70 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
@@ -24,8 +24,6 @@ import com.hazelcast.map.IMap;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.api.exceptions.DataNodeNotFoundException;
@@ -51,12 +49,8 @@ public class ModuleSyncTasks {
* Perform module sync on a batch of cm handles.
*
* @param cmHandleIds a batch of cm handle ids to perform module sync on
- * @param batchCounter the number of batches currently being processed, will be decreased when
- * task is finished or fails
- * @return completed future to handle post-processing
*/
- public CompletableFuture<Void> performModuleSync(final Collection<String> cmHandleIds,
- final AtomicInteger batchCounter) {
+ public void performModuleSync(final Collection<String> cmHandleIds) {
final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(cmHandleIds.size());
try {
for (final String cmHandleId : cmHandleIds) {
@@ -74,11 +68,8 @@ public class ModuleSyncTasks {
}
}
} finally {
- batchCounter.getAndDecrement();
lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandleStatePerCmHandle);
- log.info("Processing module sync batch finished. {} batch(es) active.", batchCounter.get());
}
- return CompletableFuture.completedFuture(null);
}
/**
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
index 32e1c49f17..6eefedb633 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022-2024 Nordix Foundation
+ * Copyright (C) 2022-2025 Nordix Foundation
* Modifications Copyright (C) 2022 Bell Canada
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -27,13 +27,9 @@ import com.hazelcast.map.IMap;
import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
-import org.onap.cps.ncmp.impl.utils.Sleeper;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@@ -46,16 +42,10 @@ public class ModuleSyncWatchdog {
private final BlockingQueue<String> moduleSyncWorkQueue;
private final IMap<String, Object> moduleSyncStartedOnCmHandles;
private final ModuleSyncTasks moduleSyncTasks;
- private final AsyncTaskExecutor asyncTaskExecutor;
private final IMap<String, String> cpsAndNcmpLock;
- private final Sleeper sleeper;
private static final int MODULE_SYNC_BATCH_SIZE = 100;
- private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10;
private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started";
- private static final long ASYNC_TASK_TIMEOUT_IN_MILLISECONDS = TimeUnit.MINUTES.toMillis(5);
- @Getter
- private AtomicInteger batchCounter = new AtomicInteger(1);
/**
* Check DB for any cm handles in 'ADVISED' state.
@@ -69,18 +59,11 @@ public class ModuleSyncWatchdog {
log.debug("Processing module sync watchdog waking up.");
populateWorkQueueIfNeeded();
while (!moduleSyncWorkQueue.isEmpty()) {
- if (batchCounter.get() <= asyncTaskExecutor.getAsyncTaskParallelismLevel()) {
- final Collection<String> nextBatch = prepareNextBatch();
- log.info("Processing module sync batch of {}. {} batch(es) active.",
- nextBatch.size(), batchCounter.get());
- if (!nextBatch.isEmpty()) {
- asyncTaskExecutor.executeTask(() ->
- moduleSyncTasks.performModuleSync(nextBatch, batchCounter),
- ASYNC_TASK_TIMEOUT_IN_MILLISECONDS);
- batchCounter.getAndIncrement();
- }
- } else {
- preventBusyWait();
+ final Collection<String> nextBatch = prepareNextBatch();
+ if (!nextBatch.isEmpty()) {
+ log.info("Processing module sync batch of {}. 1 batch(es) active.", nextBatch.size());
+ moduleSyncTasks.performModuleSync(nextBatch);
+ log.info("Processing module sync batch finished. 0 batch(es) active.");
}
}
}
@@ -153,13 +136,4 @@ public class ModuleSyncWatchdog {
log.info("nextBatch size : {}", nextBatch.size());
return nextBatch;
}
-
- private void preventBusyWait() {
- try {
- log.debug("Busy waiting now");
- sleeper.haveALittleRest(PREVENT_CPU_BURN_WAIT_TIME_MILLIS);
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java
deleted file mode 100644
index 7a02fa06e0..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.utils;
-
-import java.util.concurrent.TimeUnit;
-import org.springframework.stereotype.Service;
-
-/**
- * This class is to extract out sleep functionality so the interrupted exception handling can
- * be covered with a test (e.g. using spy on Sleeper) and help to get to 100% code coverage.
- */
-@Service
-public class Sleeper {
- public void haveALittleRest(final long timeInMillis) throws InterruptedException {
- TimeUnit.MILLISECONDS.sleep(timeInMillis);
- }
-}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutorSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutorSpec.groovy
deleted file mode 100644
index 751c97a4d0..0000000000
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutorSpec.groovy
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2022-2023 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.inventory.sync
-
-
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.boot.test.context.SpringBootTest
-import spock.lang.Specification
-
-import java.util.concurrent.TimeoutException
-import java.util.function.Supplier
-
-@SpringBootTest(classes = AsyncTaskExecutor)
-class AsyncTaskExecutorSpec extends Specification {
-
- @Autowired
- AsyncTaskExecutor objectUnderTest
- def mockTaskSupplier = Mock(Supplier<Object>)
-
- def 'Parallelism level configuration.'() {
- expect: 'Parallelism level is configured with the correct value'
- assert objectUnderTest.getAsyncTaskParallelismLevel() == 3
- }
-
- def 'Task completion with #caseDescriptor.'() {
- when: 'task completion is handled'
- def irrelevantResponse = null
- objectUnderTest.handleTaskCompletion(irrelevantResponse, exception);
- then: 'any exception is swallowed by the task completion (logged)'
- noExceptionThrown()
- where: 'following cases are tested'
- caseDescriptor | exception
- 'no exception' | null
- 'time out exception' | new TimeoutException("time-out")
- 'unexpected exception' | new Exception("some exception")
- }
-
- def 'Task execution.'() {
- when: 'a task is submitted for execution'
- objectUnderTest.executeTask(() -> mockTaskSupplier, 0)
- then: 'the task submission is successful'
- noExceptionThrown()
- }
-
-}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy
index 92f4b38f31..98f3cc05bb 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy
@@ -39,8 +39,6 @@ import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventsCmHandleStateHandler
import org.slf4j.LoggerFactory
import spock.lang.Specification
-import java.util.concurrent.atomic.AtomicInteger
-
import static org.onap.cps.ncmp.api.inventory.models.LockReasonCategory.MODULE_SYNC_FAILED
import static org.onap.cps.ncmp.api.inventory.models.LockReasonCategory.MODULE_UPGRADE
import static org.onap.cps.ncmp.api.inventory.models.LockReasonCategory.MODULE_UPGRADE_FAILED
@@ -70,8 +68,6 @@ class ModuleSyncTasksSpec extends Specification {
.getOrCreateHazelcastInstance(new Config('hazelcastInstanceName'))
.getMap('mapInstanceName')
- def batchCount = new AtomicInteger(5)
-
def objectUnderTest = new ModuleSyncTasks(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService,
mockLcmEventsCmHandleStateHandler, moduleSyncStartedOnCmHandles)
@@ -87,7 +83,7 @@ class ModuleSyncTasksSpec extends Specification {
mockInventoryPersistence.getYangModelCmHandle('cm-handle-1') >> cmHandle1
mockInventoryPersistence.getYangModelCmHandle('cm-handle-2') >> cmHandle2
when: 'module sync poll is executed'
- objectUnderTest.performModuleSync(['cm-handle-1', 'cm-handle-2'], batchCount)
+ objectUnderTest.performModuleSync(['cm-handle-1', 'cm-handle-2'])
then: 'module sync service is invoked for each cm handle'
1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assert args[0].id == 'cm-handle-1' }
1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assert args[0].id == 'cm-handle-2' }
@@ -95,8 +91,6 @@ class ModuleSyncTasksSpec extends Specification {
1 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch(_) >> { args ->
assertBatch(args, ['cm-handle-1', 'cm-handle-2'], CmHandleState.READY)
}
- and: 'batch count is decremented by one'
- assert batchCount.get() == 4
}
def 'Handle CM handle failure during #scenario and log MODULE_UPGRADE lock reason'() {
@@ -108,15 +102,13 @@ class ModuleSyncTasksSpec extends Specification {
mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { throw new Exception('some exception') }
mockModuleSyncService.syncAndUpgradeSchemaSet(_) >> { throw new Exception('some exception') }
when: 'module sync is executed'
- objectUnderTest.performModuleSync(['cm-handle'], batchCount)
+ objectUnderTest.performModuleSync(['cm-handle'])
then: 'lock reason is updated with number of attempts'
1 * mockSyncUtils.updateLockReasonWithAttempts(_, expectedLockReasonCategory, 'some exception')
and: 'the state handler is called to update the state to LOCKED'
1 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch(_) >> { args ->
assertBatch(args, ['cm-handle'], CmHandleState.LOCKED)
}
- and: 'batch count is decremented by one'
- assert batchCount.get() == 4
where:
scenario | lockReasonCategory | lockReasonDetails || expectedLockReasonCategory
'module sync' | MODULE_SYNC_FAILED | 'some lock details' || MODULE_SYNC_FAILED
@@ -132,7 +124,7 @@ class ModuleSyncTasksSpec extends Specification {
and: 'a cm handle in advised state'
mockInventoryPersistence.getYangModelCmHandle('cm-handle-3') >> cmHandleByIdAndState('cm-handle-3', CmHandleState.ADVISED)
when: 'module sync poll is executed'
- objectUnderTest.performModuleSync(['cm-handle-1', 'cm-handle-2', 'cm-handle-3'], batchCount)
+ objectUnderTest.performModuleSync(['cm-handle-1', 'cm-handle-2', 'cm-handle-3'])
then: 'no exception is thrown'
noExceptionThrown()
and: 'the deleted cm-handle did not sync'
@@ -176,7 +168,7 @@ class ModuleSyncTasksSpec extends Specification {
and: 'entry in progress map for other cm handle'
moduleSyncStartedOnCmHandles.put('other-cm-handle', 'started')
when: 'module sync poll is executed'
- objectUnderTest.performModuleSync(['cm-handle-1'], batchCount)
+ objectUnderTest.performModuleSync(['cm-handle-1'])
then: 'module sync service is invoked for cm handle'
1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assert args[0].id == 'cm-handle-1' }
and: 'the entry for other cm handle is still in the progress map'
@@ -201,7 +193,7 @@ class ModuleSyncTasksSpec extends Specification {
cmHandle.compositeState.setLockReason(CompositeState.LockReason.builder().lockReasonCategory(lockReasonCategory).build())
mockInventoryPersistence.getYangModelCmHandle('cm-handle') >> cmHandle
when: 'module sync is executed'
- objectUnderTest.performModuleSync(['cm-handle'], batchCount)
+ objectUnderTest.performModuleSync(['cm-handle'])
then: 'the module sync service should attempt to sync and upgrade the CM handle'
1 * mockModuleSyncService.syncAndUpgradeSchemaSet(_) >> { args ->
assert args[0].id == 'cm-handle'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
index a9b88c2d3b..68aa6a1b6a 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022-2024 Nordix Foundation
+ * Copyright (C) 2022-2025 Nordix Foundation
* Modifications Copyright (C) 2022 Bell Canada
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -22,14 +22,10 @@
package org.onap.cps.ncmp.impl.inventory.sync
import com.hazelcast.map.IMap
+import java.util.concurrent.ArrayBlockingQueue
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
-import org.onap.cps.ncmp.impl.utils.Sleeper
-import org.onap.cps.api.model.DataNode
import spock.lang.Specification
-import java.util.concurrent.ArrayBlockingQueue
-import java.util.concurrent.locks.Lock
-
class ModuleSyncWatchdogSpec extends Specification {
def mockModuleOperationsUtils = Mock(ModuleOperationsUtils)
@@ -42,17 +38,9 @@ class ModuleSyncWatchdogSpec extends Specification {
def mockModuleSyncTasks = Mock(ModuleSyncTasks)
- def spiedAsyncTaskExecutor = Spy(AsyncTaskExecutor)
-
def mockCpsAndNcmpLock = Mock(IMap<String,String>)
- def spiedSleeper = Spy(Sleeper)
-
- def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor, mockCpsAndNcmpLock, spiedSleeper)
-
- void setup() {
- spiedAsyncTaskExecutor.setupThreadPool()
- }
+ def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, mockCpsAndNcmpLock)
def 'Module sync advised cm handles with #scenario.'() {
given: 'module sync utilities returns #numberOfAdvisedCmHandles advised cm handles'
@@ -61,12 +49,10 @@ class ModuleSyncWatchdogSpec extends Specification {
mockModuleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> []
and: 'the work queue can be locked'
mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
- and: 'the executor has enough available threads'
- spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 3
when: ' module sync is started'
objectUnderTest.moduleSyncAdvisedCmHandles()
then: 'it performs #expectedNumberOfTaskExecutions tasks'
- expectedNumberOfTaskExecutions * spiedAsyncTaskExecutor.executeTask(*_)
+ expectedNumberOfTaskExecutions * mockModuleSyncTasks.performModuleSync(*_)
and: 'the executing thread is unlocked'
1 * mockCpsAndNcmpLock.unlock('workQueueLock')
where: 'the following parameter are used'
@@ -84,12 +70,10 @@ class ModuleSyncWatchdogSpec extends Specification {
mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
and: 'the work queue can be locked'
mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
- and: 'the executor first has no threads but has one thread on the second attempt'
- spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >>> [ 0, 1 ]
when: ' module sync is started'
objectUnderTest.moduleSyncAdvisedCmHandles()
then: 'it performs one task'
- 1 * spiedAsyncTaskExecutor.executeTask(*_)
+ 1 * mockModuleSyncTasks.performModuleSync(*_)
}
def 'Module sync advised cm handle already handled by other thread.'() {
@@ -97,27 +81,21 @@ class ModuleSyncWatchdogSpec extends Specification {
mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
and: 'the work queue can be locked'
mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
- and: 'the executor has a thread available'
- spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 1
and: 'the semaphore cache indicates the cm handle is already being processed'
mockModuleSyncStartedOnCmHandles.putIfAbsent(*_) >> 'Started'
- when: ' module sync is started'
+ when: 'module sync is started'
objectUnderTest.moduleSyncAdvisedCmHandles()
then: 'it does NOT execute a task to process the (empty) batch'
- 0 * spiedAsyncTaskExecutor.executeTask(*_)
+ 0 * mockModuleSyncTasks.performModuleSync(*_)
}
def 'Module sync with previous cm handle(s) left in work queue.'() {
given: 'there is still a cm handle in the queue'
moduleSyncWorkQueue.offer('ch-1')
- and: 'sync utilities returns many advise cm handles'
- mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(500)
- and: 'the executor has plenty threads available'
- spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 10
- when: ' module sync is started'
+ when: 'module sync is started'
objectUnderTest.moduleSyncAdvisedCmHandles()
then: 'it does executes only one task to process the remaining handle in the queue'
- 1 * spiedAsyncTaskExecutor.executeTask(*_)
+ 1 * mockModuleSyncTasks.performModuleSync(*_)
}
def 'Reset failed cm handles.'() {
@@ -147,15 +125,6 @@ class ModuleSyncWatchdogSpec extends Specification {
true || false || 1
}
- def 'Sleeper gets interrupted.'() {
- given: 'sleeper gets interrupted'
- spiedSleeper.haveALittleRest(_) >> { throw new InterruptedException() }
- when: 'the watchdog attempts to sleep to save cpu cycles'
- objectUnderTest.preventBusyWait()
- then: 'no exception is thrown'
- noExceptionThrown()
- }
-
def createCmHandleIds(numberOfCmHandles) {
return (numberOfCmHandles > 0) ? (1..numberOfCmHandles).collect { 'ch-'+it } : []
}
diff --git a/cps-ncmp-service/src/test/resources/application.yml b/cps-ncmp-service/src/test/resources/application.yml
index 12db639633..3276ceb534 100644
--- a/cps-ncmp-service/src/test/resources/application.yml
+++ b/cps-ncmp-service/src/test/resources/application.yml
@@ -77,10 +77,6 @@ ncmp:
trust-level:
dmi-availability-watchdog-ms: 30000
- modules-sync-watchdog:
- async-executor:
- parallelism-level: 3
-
policy-executor:
enabled: true
defaultDecision: "some default decision"