summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordanielhanrahan <daniel.hanrahan@est.tech>2025-01-29 18:35:15 +0000
committerdanielhanrahan <daniel.hanrahan@est.tech>2025-02-04 10:24:58 +0000
commite08721866f0792ea830d05c415758ce98a7a9b14 (patch)
tree6fac984083dc3253c0d91cc3dc1f65d9c947fa37
parent59f1cc4c5994da34f2c48a2347499f9cfb7bd836 (diff)
Remove multithreading from module sync watchdog
After introduction of module set tag improvements, there is no need to multithreading in module sync. Performance impact is minimal. Issue-ID: CPS-2165 Signed-off-by: danielhanrahan <daniel.hanrahan@est.tech> Change-Id: I1557fc8348d39da3654a1b92944c6ad49fa8670d
-rw-r--r--cps-application/src/main/resources/application.yml4
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java77
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java11
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java38
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java35
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutorSpec.groovy63
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy18
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy49
-rw-r--r--cps-ncmp-service/src/test/resources/application.yml4
-rw-r--r--integration-test/src/test/resources/application.yml4
10 files changed, 21 insertions, 282 deletions
diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml
index 6b9c694cf2..c10a26fe0b 100644
--- a/cps-application/src/main/resources/application.yml
+++ b/cps-application/src/main/resources/application.yml
@@ -247,10 +247,6 @@ ncmp:
trust-level:
dmi-availability-watchdog-ms: 30000
- modules-sync-watchdog:
- async-executor:
- parallelism-level: 10
-
model-loader:
maximum-attempt-count: 20
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java
deleted file mode 100644
index 80bc4ab69f..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2022-2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.inventory.sync;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-import jakarta.annotation.PostConstruct;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Supplier;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Service;
-
-@Slf4j
-@Service
-public class AsyncTaskExecutor {
-
- @Value("${ncmp.modules-sync-watchdog.async-executor.parallelism-level:10}")
- @Getter
- private int asyncTaskParallelismLevel;
- private ExecutorService executorService;
- private static final int DEFAULT_PARALLELISM_LEVEL = 10;
-
- /**
- * Set up executor service with thread-pool size as per configuration parameter.
- * If modules-sync-watchdog.async-executor.parallelism-level not set a default of 10 threads will be applied.
- */
- @PostConstruct
- public void setupThreadPool() {
- executorService = Executors.newWorkStealingPool(
- asyncTaskParallelismLevel == 0 ? DEFAULT_PARALLELISM_LEVEL : asyncTaskParallelismLevel);
- }
-
- /**
- * Execute supplied task asynchronously.
- *
- * @param taskSupplier functional method is get() task need to executed asynchronously
- * @param timeOutInMillis the task timeout value in milliseconds
- */
- public void executeTask(final Supplier<Object> taskSupplier, final long timeOutInMillis) {
- CompletableFuture.supplyAsync(taskSupplier::get, executorService)
- .orTimeout(timeOutInMillis, MILLISECONDS)
- .whenCompleteAsync(this::handleTaskCompletion);
- }
-
- private void handleTaskCompletion(final Object response, final Throwable throwable) {
- if (throwable != null) {
- if (throwable instanceof TimeoutException) {
- log.error("Async task didn't complete within the required time.", throwable);
- } else {
- log.error("Watchdog async batch failed.", throwable);
- }
- }
- }
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
index f039cf3c02..b727e79e70 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
@@ -24,8 +24,6 @@ import com.hazelcast.map.IMap;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.api.exceptions.DataNodeNotFoundException;
@@ -51,12 +49,8 @@ public class ModuleSyncTasks {
* Perform module sync on a batch of cm handles.
*
* @param cmHandleIds a batch of cm handle ids to perform module sync on
- * @param batchCounter the number of batches currently being processed, will be decreased when
- * task is finished or fails
- * @return completed future to handle post-processing
*/
- public CompletableFuture<Void> performModuleSync(final Collection<String> cmHandleIds,
- final AtomicInteger batchCounter) {
+ public void performModuleSync(final Collection<String> cmHandleIds) {
final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(cmHandleIds.size());
try {
for (final String cmHandleId : cmHandleIds) {
@@ -74,11 +68,8 @@ public class ModuleSyncTasks {
}
}
} finally {
- batchCounter.getAndDecrement();
lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandleStatePerCmHandle);
- log.info("Processing module sync batch finished. {} batch(es) active.", batchCounter.get());
}
- return CompletableFuture.completedFuture(null);
}
/**
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
index 32e1c49f17..6eefedb633 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022-2024 Nordix Foundation
+ * Copyright (C) 2022-2025 Nordix Foundation
* Modifications Copyright (C) 2022 Bell Canada
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -27,13 +27,9 @@ import com.hazelcast.map.IMap;
import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
-import org.onap.cps.ncmp.impl.utils.Sleeper;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@@ -46,16 +42,10 @@ public class ModuleSyncWatchdog {
private final BlockingQueue<String> moduleSyncWorkQueue;
private final IMap<String, Object> moduleSyncStartedOnCmHandles;
private final ModuleSyncTasks moduleSyncTasks;
- private final AsyncTaskExecutor asyncTaskExecutor;
private final IMap<String, String> cpsAndNcmpLock;
- private final Sleeper sleeper;
private static final int MODULE_SYNC_BATCH_SIZE = 100;
- private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10;
private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started";
- private static final long ASYNC_TASK_TIMEOUT_IN_MILLISECONDS = TimeUnit.MINUTES.toMillis(5);
- @Getter
- private AtomicInteger batchCounter = new AtomicInteger(1);
/**
* Check DB for any cm handles in 'ADVISED' state.
@@ -69,18 +59,11 @@ public class ModuleSyncWatchdog {
log.debug("Processing module sync watchdog waking up.");
populateWorkQueueIfNeeded();
while (!moduleSyncWorkQueue.isEmpty()) {
- if (batchCounter.get() <= asyncTaskExecutor.getAsyncTaskParallelismLevel()) {
- final Collection<String> nextBatch = prepareNextBatch();
- log.info("Processing module sync batch of {}. {} batch(es) active.",
- nextBatch.size(), batchCounter.get());
- if (!nextBatch.isEmpty()) {
- asyncTaskExecutor.executeTask(() ->
- moduleSyncTasks.performModuleSync(nextBatch, batchCounter),
- ASYNC_TASK_TIMEOUT_IN_MILLISECONDS);
- batchCounter.getAndIncrement();
- }
- } else {
- preventBusyWait();
+ final Collection<String> nextBatch = prepareNextBatch();
+ if (!nextBatch.isEmpty()) {
+ log.info("Processing module sync batch of {}. 1 batch(es) active.", nextBatch.size());
+ moduleSyncTasks.performModuleSync(nextBatch);
+ log.info("Processing module sync batch finished. 0 batch(es) active.");
}
}
}
@@ -153,13 +136,4 @@ public class ModuleSyncWatchdog {
log.info("nextBatch size : {}", nextBatch.size());
return nextBatch;
}
-
- private void preventBusyWait() {
- try {
- log.debug("Busy waiting now");
- sleeper.haveALittleRest(PREVENT_CPU_BURN_WAIT_TIME_MILLIS);
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java
deleted file mode 100644
index 7a02fa06e0..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.utils;
-
-import java.util.concurrent.TimeUnit;
-import org.springframework.stereotype.Service;
-
-/**
- * This class is to extract out sleep functionality so the interrupted exception handling can
- * be covered with a test (e.g. using spy on Sleeper) and help to get to 100% code coverage.
- */
-@Service
-public class Sleeper {
- public void haveALittleRest(final long timeInMillis) throws InterruptedException {
- TimeUnit.MILLISECONDS.sleep(timeInMillis);
- }
-}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutorSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutorSpec.groovy
deleted file mode 100644
index 751c97a4d0..0000000000
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutorSpec.groovy
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2022-2023 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.inventory.sync
-
-
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.boot.test.context.SpringBootTest
-import spock.lang.Specification
-
-import java.util.concurrent.TimeoutException
-import java.util.function.Supplier
-
-@SpringBootTest(classes = AsyncTaskExecutor)
-class AsyncTaskExecutorSpec extends Specification {
-
- @Autowired
- AsyncTaskExecutor objectUnderTest
- def mockTaskSupplier = Mock(Supplier<Object>)
-
- def 'Parallelism level configuration.'() {
- expect: 'Parallelism level is configured with the correct value'
- assert objectUnderTest.getAsyncTaskParallelismLevel() == 3
- }
-
- def 'Task completion with #caseDescriptor.'() {
- when: 'task completion is handled'
- def irrelevantResponse = null
- objectUnderTest.handleTaskCompletion(irrelevantResponse, exception);
- then: 'any exception is swallowed by the task completion (logged)'
- noExceptionThrown()
- where: 'following cases are tested'
- caseDescriptor | exception
- 'no exception' | null
- 'time out exception' | new TimeoutException("time-out")
- 'unexpected exception' | new Exception("some exception")
- }
-
- def 'Task execution.'() {
- when: 'a task is submitted for execution'
- objectUnderTest.executeTask(() -> mockTaskSupplier, 0)
- then: 'the task submission is successful'
- noExceptionThrown()
- }
-
-}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy
index 92f4b38f31..98f3cc05bb 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy
@@ -39,8 +39,6 @@ import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventsCmHandleStateHandler
import org.slf4j.LoggerFactory
import spock.lang.Specification
-import java.util.concurrent.atomic.AtomicInteger
-
import static org.onap.cps.ncmp.api.inventory.models.LockReasonCategory.MODULE_SYNC_FAILED
import static org.onap.cps.ncmp.api.inventory.models.LockReasonCategory.MODULE_UPGRADE
import static org.onap.cps.ncmp.api.inventory.models.LockReasonCategory.MODULE_UPGRADE_FAILED
@@ -70,8 +68,6 @@ class ModuleSyncTasksSpec extends Specification {
.getOrCreateHazelcastInstance(new Config('hazelcastInstanceName'))
.getMap('mapInstanceName')
- def batchCount = new AtomicInteger(5)
-
def objectUnderTest = new ModuleSyncTasks(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService,
mockLcmEventsCmHandleStateHandler, moduleSyncStartedOnCmHandles)
@@ -87,7 +83,7 @@ class ModuleSyncTasksSpec extends Specification {
mockInventoryPersistence.getYangModelCmHandle('cm-handle-1') >> cmHandle1
mockInventoryPersistence.getYangModelCmHandle('cm-handle-2') >> cmHandle2
when: 'module sync poll is executed'
- objectUnderTest.performModuleSync(['cm-handle-1', 'cm-handle-2'], batchCount)
+ objectUnderTest.performModuleSync(['cm-handle-1', 'cm-handle-2'])
then: 'module sync service is invoked for each cm handle'
1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assert args[0].id == 'cm-handle-1' }
1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assert args[0].id == 'cm-handle-2' }
@@ -95,8 +91,6 @@ class ModuleSyncTasksSpec extends Specification {
1 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch(_) >> { args ->
assertBatch(args, ['cm-handle-1', 'cm-handle-2'], CmHandleState.READY)
}
- and: 'batch count is decremented by one'
- assert batchCount.get() == 4
}
def 'Handle CM handle failure during #scenario and log MODULE_UPGRADE lock reason'() {
@@ -108,15 +102,13 @@ class ModuleSyncTasksSpec extends Specification {
mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { throw new Exception('some exception') }
mockModuleSyncService.syncAndUpgradeSchemaSet(_) >> { throw new Exception('some exception') }
when: 'module sync is executed'
- objectUnderTest.performModuleSync(['cm-handle'], batchCount)
+ objectUnderTest.performModuleSync(['cm-handle'])
then: 'lock reason is updated with number of attempts'
1 * mockSyncUtils.updateLockReasonWithAttempts(_, expectedLockReasonCategory, 'some exception')
and: 'the state handler is called to update the state to LOCKED'
1 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch(_) >> { args ->
assertBatch(args, ['cm-handle'], CmHandleState.LOCKED)
}
- and: 'batch count is decremented by one'
- assert batchCount.get() == 4
where:
scenario | lockReasonCategory | lockReasonDetails || expectedLockReasonCategory
'module sync' | MODULE_SYNC_FAILED | 'some lock details' || MODULE_SYNC_FAILED
@@ -132,7 +124,7 @@ class ModuleSyncTasksSpec extends Specification {
and: 'a cm handle in advised state'
mockInventoryPersistence.getYangModelCmHandle('cm-handle-3') >> cmHandleByIdAndState('cm-handle-3', CmHandleState.ADVISED)
when: 'module sync poll is executed'
- objectUnderTest.performModuleSync(['cm-handle-1', 'cm-handle-2', 'cm-handle-3'], batchCount)
+ objectUnderTest.performModuleSync(['cm-handle-1', 'cm-handle-2', 'cm-handle-3'])
then: 'no exception is thrown'
noExceptionThrown()
and: 'the deleted cm-handle did not sync'
@@ -176,7 +168,7 @@ class ModuleSyncTasksSpec extends Specification {
and: 'entry in progress map for other cm handle'
moduleSyncStartedOnCmHandles.put('other-cm-handle', 'started')
when: 'module sync poll is executed'
- objectUnderTest.performModuleSync(['cm-handle-1'], batchCount)
+ objectUnderTest.performModuleSync(['cm-handle-1'])
then: 'module sync service is invoked for cm handle'
1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assert args[0].id == 'cm-handle-1' }
and: 'the entry for other cm handle is still in the progress map'
@@ -201,7 +193,7 @@ class ModuleSyncTasksSpec extends Specification {
cmHandle.compositeState.setLockReason(CompositeState.LockReason.builder().lockReasonCategory(lockReasonCategory).build())
mockInventoryPersistence.getYangModelCmHandle('cm-handle') >> cmHandle
when: 'module sync is executed'
- objectUnderTest.performModuleSync(['cm-handle'], batchCount)
+ objectUnderTest.performModuleSync(['cm-handle'])
then: 'the module sync service should attempt to sync and upgrade the CM handle'
1 * mockModuleSyncService.syncAndUpgradeSchemaSet(_) >> { args ->
assert args[0].id == 'cm-handle'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
index a9b88c2d3b..68aa6a1b6a 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022-2024 Nordix Foundation
+ * Copyright (C) 2022-2025 Nordix Foundation
* Modifications Copyright (C) 2022 Bell Canada
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -22,14 +22,10 @@
package org.onap.cps.ncmp.impl.inventory.sync
import com.hazelcast.map.IMap
+import java.util.concurrent.ArrayBlockingQueue
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
-import org.onap.cps.ncmp.impl.utils.Sleeper
-import org.onap.cps.api.model.DataNode
import spock.lang.Specification
-import java.util.concurrent.ArrayBlockingQueue
-import java.util.concurrent.locks.Lock
-
class ModuleSyncWatchdogSpec extends Specification {
def mockModuleOperationsUtils = Mock(ModuleOperationsUtils)
@@ -42,17 +38,9 @@ class ModuleSyncWatchdogSpec extends Specification {
def mockModuleSyncTasks = Mock(ModuleSyncTasks)
- def spiedAsyncTaskExecutor = Spy(AsyncTaskExecutor)
-
def mockCpsAndNcmpLock = Mock(IMap<String,String>)
- def spiedSleeper = Spy(Sleeper)
-
- def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor, mockCpsAndNcmpLock, spiedSleeper)
-
- void setup() {
- spiedAsyncTaskExecutor.setupThreadPool()
- }
+ def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, mockCpsAndNcmpLock)
def 'Module sync advised cm handles with #scenario.'() {
given: 'module sync utilities returns #numberOfAdvisedCmHandles advised cm handles'
@@ -61,12 +49,10 @@ class ModuleSyncWatchdogSpec extends Specification {
mockModuleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> []
and: 'the work queue can be locked'
mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
- and: 'the executor has enough available threads'
- spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 3
when: ' module sync is started'
objectUnderTest.moduleSyncAdvisedCmHandles()
then: 'it performs #expectedNumberOfTaskExecutions tasks'
- expectedNumberOfTaskExecutions * spiedAsyncTaskExecutor.executeTask(*_)
+ expectedNumberOfTaskExecutions * mockModuleSyncTasks.performModuleSync(*_)
and: 'the executing thread is unlocked'
1 * mockCpsAndNcmpLock.unlock('workQueueLock')
where: 'the following parameter are used'
@@ -84,12 +70,10 @@ class ModuleSyncWatchdogSpec extends Specification {
mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
and: 'the work queue can be locked'
mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
- and: 'the executor first has no threads but has one thread on the second attempt'
- spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >>> [ 0, 1 ]
when: ' module sync is started'
objectUnderTest.moduleSyncAdvisedCmHandles()
then: 'it performs one task'
- 1 * spiedAsyncTaskExecutor.executeTask(*_)
+ 1 * mockModuleSyncTasks.performModuleSync(*_)
}
def 'Module sync advised cm handle already handled by other thread.'() {
@@ -97,27 +81,21 @@ class ModuleSyncWatchdogSpec extends Specification {
mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
and: 'the work queue can be locked'
mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
- and: 'the executor has a thread available'
- spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 1
and: 'the semaphore cache indicates the cm handle is already being processed'
mockModuleSyncStartedOnCmHandles.putIfAbsent(*_) >> 'Started'
- when: ' module sync is started'
+ when: 'module sync is started'
objectUnderTest.moduleSyncAdvisedCmHandles()
then: 'it does NOT execute a task to process the (empty) batch'
- 0 * spiedAsyncTaskExecutor.executeTask(*_)
+ 0 * mockModuleSyncTasks.performModuleSync(*_)
}
def 'Module sync with previous cm handle(s) left in work queue.'() {
given: 'there is still a cm handle in the queue'
moduleSyncWorkQueue.offer('ch-1')
- and: 'sync utilities returns many advise cm handles'
- mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(500)
- and: 'the executor has plenty threads available'
- spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 10
- when: ' module sync is started'
+ when: 'module sync is started'
objectUnderTest.moduleSyncAdvisedCmHandles()
then: 'it does executes only one task to process the remaining handle in the queue'
- 1 * spiedAsyncTaskExecutor.executeTask(*_)
+ 1 * mockModuleSyncTasks.performModuleSync(*_)
}
def 'Reset failed cm handles.'() {
@@ -147,15 +125,6 @@ class ModuleSyncWatchdogSpec extends Specification {
true || false || 1
}
- def 'Sleeper gets interrupted.'() {
- given: 'sleeper gets interrupted'
- spiedSleeper.haveALittleRest(_) >> { throw new InterruptedException() }
- when: 'the watchdog attempts to sleep to save cpu cycles'
- objectUnderTest.preventBusyWait()
- then: 'no exception is thrown'
- noExceptionThrown()
- }
-
def createCmHandleIds(numberOfCmHandles) {
return (numberOfCmHandles > 0) ? (1..numberOfCmHandles).collect { 'ch-'+it } : []
}
diff --git a/cps-ncmp-service/src/test/resources/application.yml b/cps-ncmp-service/src/test/resources/application.yml
index 12db639633..3276ceb534 100644
--- a/cps-ncmp-service/src/test/resources/application.yml
+++ b/cps-ncmp-service/src/test/resources/application.yml
@@ -77,10 +77,6 @@ ncmp:
trust-level:
dmi-availability-watchdog-ms: 30000
- modules-sync-watchdog:
- async-executor:
- parallelism-level: 3
-
policy-executor:
enabled: true
defaultDecision: "some default decision"
diff --git a/integration-test/src/test/resources/application.yml b/integration-test/src/test/resources/application.yml
index 30598dfb90..8ede492ad1 100644
--- a/integration-test/src/test/resources/application.yml
+++ b/integration-test/src/test/resources/application.yml
@@ -189,10 +189,6 @@ ncmp:
trust-level:
dmi-availability-watchdog-ms: 30000
- modules-sync-watchdog:
- async-executor:
- parallelism-level: 2
-
model-loader:
maximum-attempt-count: 20