summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java44
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java28
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutor.java77
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy27
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy34
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutorSpec.groovy61
-rw-r--r--cps-ncmp-service/src/test/resources/application.yml3
7 files changed, 226 insertions, 48 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java
index 5e26650eda..597e2ba8e5 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler;
@@ -52,28 +53,35 @@ public class ModuleSyncTasks {
* Perform module sync on a batch of cm handles.
*
* @param cmHandlesAsDataNodes a batch of Data nodes representing cm handles to perform module sync on
+ * @param batchCounter the number of batches currently being processed, will be decreased when task is finished
+ * or fails
* @return completed future to handle post-processing
*/
- public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes) {
- final Map<YangModelCmHandle, CmHandleState> cmHandelStatePerCmHandle = new HashMap<>();
- for (final DataNode cmHandleAsDataNode : cmHandlesAsDataNodes) {
- final String cmHandleId = String.valueOf(cmHandleAsDataNode.getLeaves().get("id"));
- final YangModelCmHandle yangModelCmHandle =
- YangDataConverter.convertCmHandleToYangModel(cmHandleAsDataNode, cmHandleId);
- final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
- try {
- moduleSyncService.deleteSchemaSetIfExists(cmHandleId);
- moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle);
- cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY);
- } catch (final Exception e) {
- syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
- LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
- setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason());
- cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.LOCKED);
+ public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes,
+ final AtomicInteger batchCounter) {
+ try {
+ final Map<YangModelCmHandle, CmHandleState> cmHandelStatePerCmHandle = new HashMap<>();
+ for (final DataNode cmHandleAsDataNode : cmHandlesAsDataNodes) {
+ final String cmHandleId = String.valueOf(cmHandleAsDataNode.getLeaves().get("id"));
+ final YangModelCmHandle yangModelCmHandle =
+ YangDataConverter.convertCmHandleToYangModel(cmHandleAsDataNode, cmHandleId);
+ final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
+ try {
+ moduleSyncService.deleteSchemaSetIfExists(cmHandleId);
+ moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle);
+ cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY);
+ } catch (final Exception e) {
+ syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
+ LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
+ setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason());
+ cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.LOCKED);
+ }
+ log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
}
- log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
+ updateCmHandlesStateBatch(cmHandelStatePerCmHandle);
+ } finally {
+ batchCounter.getAndDecrement();
}
- updateCmHandlesStateBatch(cmHandelStatePerCmHandle);
return COMPLETED_FUTURE;
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
index 8074fe6fe1..73954c36b3 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
@@ -27,36 +27,50 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
+import org.onap.cps.ncmp.api.inventory.sync.executor.AsyncTaskExecutor;
import org.onap.cps.spi.model.DataNode;
import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
@Slf4j
@RequiredArgsConstructor
-@Component
+@Service
public class ModuleSyncWatchdog {
private final SyncUtils syncUtils;
private final BlockingQueue<DataNode> moduleSyncWorkQueue;
private final Map<String, Object> moduleSyncStartedOnCmHandles;
private final ModuleSyncTasks moduleSyncTasks;
-
+ private final AsyncTaskExecutor asyncTaskExecutor;
private static final int MODULE_SYNC_BATCH_SIZE = 100;
private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10;
private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started";
+ private static final long ASYNC_TASK_TIMEOUT_IN_MILLISECONDS = TimeUnit.MINUTES.toMillis(5);
+ @Getter
+ private AtomicInteger batchCounter = new AtomicInteger(1);
/**
- * Execute Cm Handle poll which changes the cm handle state from 'ADVISED' to 'READY'.
+ * Check DB for any cm handles in 'ADVISED' state.
+ * Queue and create batches to process them asynchronously.
+ * This method will only finish when there are no more 'ADVISED' cm handles in the DB.
+ * This method wil be triggered on a configurable interval
*/
@Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:5000}")
public void moduleSyncAdvisedCmHandles() {
populateWorkQueueIfNeeded();
- while (!moduleSyncWorkQueue.isEmpty()) {
+ final int asyncTaskParallelismLevel = asyncTaskExecutor.getAsyncTaskParallelismLevel();
+ while (!moduleSyncWorkQueue.isEmpty() && batchCounter.get() <= asyncTaskParallelismLevel) {
+ batchCounter.getAndIncrement();
final Collection<DataNode> nextBatch = prepareNextBatch();
- moduleSyncTasks.performModuleSync(nextBatch);
+ asyncTaskExecutor.executeTask(() ->
+ moduleSyncTasks.performModuleSync(nextBatch, batchCounter),
+ ASYNC_TASK_TIMEOUT_IN_MILLISECONDS
+ );
preventBusyWait();
}
}
@@ -71,8 +85,6 @@ public class ModuleSyncWatchdog {
}
private void preventBusyWait() {
- // This method isn't really needed until CPS-1200 Performance Improvement: Watchdog Parallel execution
- // but leaving here to minimize impacts on this class for that Jira
try {
TimeUnit.MILLISECONDS.sleep(PREVENT_CPU_BURN_WAIT_TIME_MILLIS);
} catch (final InterruptedException e) {
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutor.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutor.java
new file mode 100644
index 0000000000..7b4d2cfaa9
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutor.java
@@ -0,0 +1,77 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.inventory.sync.executor;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+import javax.annotation.PostConstruct;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+public class AsyncTaskExecutor {
+
+ @Value("${modules-sync-watchdog.async-executor.parallelism-level:10}")
+ @Getter
+ private int asyncTaskParallelismLevel;
+ private ExecutorService executorService;
+ private static final int DEFAULT_PARALLELISM_LEVEL = 10;
+
+ /**
+ * Set up executor service with thread-pool size as per configuration parameter.
+ * If modules-sync-watchdog.async-executor.parallelism-level not set a default of 10 threads will be applied.
+ */
+ @PostConstruct
+ public void setupThreadPool() {
+ executorService = Executors.newWorkStealingPool(
+ asyncTaskParallelismLevel == 0 ? DEFAULT_PARALLELISM_LEVEL : asyncTaskParallelismLevel);
+ }
+
+ /**
+ * Execute supplied task asynchronously.
+ *
+ * @param taskSupplier functional method is get() task need to executed asynchronously
+ * @param timeOutInMillis the task timeout value in milliseconds
+ */
+ public void executeTask(final Supplier<Object> taskSupplier, final long timeOutInMillis) {
+ CompletableFuture.supplyAsync(taskSupplier::get, executorService)
+ .orTimeout(timeOutInMillis, MILLISECONDS)
+ .whenCompleteAsync(this::handleTaskCompletion);
+ }
+
+ private void handleTaskCompletion(final Object response, final Throwable throwable) {
+ if (throwable != null) {
+ if (throwable instanceof TimeoutException) {
+ log.warn("Async task didn't completed within the required time.");
+ } else {
+ log.debug("Watchdog async batch failed. caused by : {}", throwable.getMessage());
+ }
+ }
+ }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy
index 291ba968ff..a2339963e3 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy
@@ -30,6 +30,7 @@ import org.onap.cps.ncmp.api.inventory.InventoryPersistence
import org.onap.cps.ncmp.api.inventory.LockReasonCategory
import org.onap.cps.spi.model.DataNode
import spock.lang.Specification
+import java.util.concurrent.atomic.AtomicInteger
class ModuleSyncTasksSpec extends Specification {
@@ -41,6 +42,8 @@ class ModuleSyncTasksSpec extends Specification {
def mockLcmEventsCmHandleStateHandler = Mock(LcmEventsCmHandleStateHandler)
+ def batchCount = new AtomicInteger(5)
+
def objectUnderTest = new ModuleSyncTasks(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService, mockLcmEventsCmHandleStateHandler)
def 'Module Sync ADVISED cm handles.'() {
@@ -50,15 +53,17 @@ class ModuleSyncTasksSpec extends Specification {
and: 'the inventory persistence cm handle returns a ADVISED state for the any handle'
mockInventoryPersistence.getCmHandleState(_) >> new CompositeState(cmHandleState: CmHandleState.ADVISED)
when: 'module sync poll is executed'
- objectUnderTest.performModuleSync([cmHandle1, cmHandle2])
+ objectUnderTest.performModuleSync([cmHandle1, cmHandle2], batchCount)
then: 'module sync service deletes schemas set of each cm handle if it already exists'
1 * mockModuleSyncService.deleteSchemaSetIfExists('cm-handle-1')
1 * mockModuleSyncService.deleteSchemaSetIfExists('cm-handle-2')
and: 'module sync service is invoked for each cm handle'
- 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args,'cm-handle-1') }
- 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args,'cm-handle-2') }
+ 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args, 'cm-handle-1') }
+ 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args, 'cm-handle-2') }
and: 'the state handler is called for the both cm handles'
2 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.READY)
+ and: 'batch count is decremented by one'
+ assert batchCount.get() == 4
}
def 'Module Sync ADVISED cm handle with failure during sync.'() {
@@ -70,11 +75,13 @@ class ModuleSyncTasksSpec extends Specification {
and: 'module sync service attempts to sync the cm handle and throws an exception'
1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(*_) >> { throw new Exception('some exception') }
when: 'module sync is executed'
- objectUnderTest.performModuleSync([cmHandle])
+ objectUnderTest.performModuleSync([cmHandle], batchCount)
then: 'update lock reason, details and attempts is invoked'
- 1 * mockSyncUtils.updateLockReasonDetailsAndAttempts(cmHandleState, LockReasonCategory.LOCKED_MODULE_SYNC_FAILED ,'some exception')
+ 1 * mockSyncUtils.updateLockReasonDetailsAndAttempts(cmHandleState, LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, 'some exception')
and: 'the state handler is called to update the state to LOCKED'
1 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.LOCKED)
+ and: 'batch count is decremented by one'
+ assert batchCount.get() == 4
}
def 'Reset failed CM Handles #scenario.'() {
@@ -90,14 +97,14 @@ class ModuleSyncTasksSpec extends Specification {
then: 'updated to state "ADVISED" from "READY" is called as often as there are cm handles ready for retry'
expectedNumberOfInvocationsToSaveCmHandleState * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.ADVISED)
where:
- scenario | isReadyForRetry || expectedNumberOfInvocationsToSaveCmHandleState
- 'retry locked cm handle once' | [true, false] || 1
- 'retry locked cm handle twice' | [true, true] || 2
- 'do not retry locked cm handle' | [false, false] || 0
+ scenario | isReadyForRetry || expectedNumberOfInvocationsToSaveCmHandleState
+ 'retry locked cm handle once' | [true, false] || 1
+ 'retry locked cm handle twice' | [true, true] || 2
+ 'do not retry locked cm handle' | [false, false] || 0
}
def advisedCmHandleAsDataNode(cmHandleId) {
- return new DataNode(anchorName:cmHandleId, leaves:['id':cmHandleId, 'cm-handle-state':'ADVISED'])
+ return new DataNode(anchorName: cmHandleId, leaves: ['id': cmHandleId, 'cm-handle-state': 'ADVISED'])
}
def assertYamgModelCmHandleArgument(args, expectedCmHandleId) {
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
index 43f492dbd7..e5240c0e66 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
@@ -22,7 +22,7 @@
package org.onap.cps.ncmp.api.inventory.sync
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
-
+import org.onap.cps.ncmp.api.inventory.sync.executor.AsyncTaskExecutor
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.BlockingQueue
import org.onap.cps.spi.model.DataNode
@@ -34,28 +34,38 @@ class ModuleSyncWatchdogSpec extends Specification {
def static testQueueCapacity = 50 + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE
- BlockingQueue<DataNode> moduleSyncWorkQueue = new ArrayBlockingQueue(testQueueCapacity)
+ def moduleSyncWorkQueue = new ArrayBlockingQueue(testQueueCapacity)
def moduleSyncStartedOnCmHandles = [:]
def mockModuleSyncTasks = Mock(ModuleSyncTasks)
- def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , moduleSyncStartedOnCmHandles, mockModuleSyncTasks)
+ def spiedAsyncTaskExecutor = Spy(AsyncTaskExecutor)
+
+ def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , moduleSyncStartedOnCmHandles,
+ mockModuleSyncTasks, spiedAsyncTaskExecutor)
+
+ void setup() {
+ spiedAsyncTaskExecutor.setupThreadPool();
+ }
- def 'Module sync #scenario , #numberOfAdvisedCmHandles advised cm handles.'() {
+ def 'Module sync advised cm handles with #scenario.'() {
given: 'sync utilities returns #numberOfAdvisedCmHandles advised cm handles'
mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(numberOfAdvisedCmHandles)
+ and: 'the executor has #parallelismLevel available threads'
+ spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> parallelismLevel
when: ' module sync is started'
objectUnderTest.moduleSyncAdvisedCmHandles()
then: 'it performs #expectedNumberOfTaskExecutions tasks'
- expectedNumberOfTaskExecutions * mockModuleSyncTasks.performModuleSync(_)
- where:
- scenario | numberOfAdvisedCmHandles || expectedNumberOfTaskExecutions
- 'less then 1 batch' | 1 || 1
- 'exactly 1 batch' | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 1
- '2 batches' | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 2
- 'queue capacity' | testQueueCapacity || 3
- 'over queue capacity' | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3
+ expectedNumberOfTaskExecutions * spiedAsyncTaskExecutor.executeTask(*_)
+ where: ' the following parameter are used'
+ scenario | parallelismLevel | numberOfAdvisedCmHandles || expectedNumberOfTaskExecutions
+ 'less then 1 batch' | 9 | 1 || 1
+ 'exactly 1 batch' | 9 | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 1
+ '2 batches' | 9 | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 2
+ 'queue capacity' | 9 | testQueueCapacity || 3
+ 'over queue capacity' | 9 | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3
+ 'not enough threads' | 2 | testQueueCapacity || 2
}
def 'Reset failed cm handles.'() {
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutorSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutorSpec.groovy
new file mode 100644
index 0000000000..ba1820e645
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutorSpec.groovy
@@ -0,0 +1,61 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.inventory.sync.executor
+
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import spock.lang.Specification
+import java.util.concurrent.TimeoutException
+import java.util.function.Supplier
+
+@SpringBootTest(classes = AsyncTaskExecutor)
+class AsyncTaskExecutorSpec extends Specification {
+
+ @Autowired
+ AsyncTaskExecutor objectUnderTest
+ def mockTaskSupplier = Mock(Supplier<Object>)
+
+ def 'Parallelism level configuration.'() {
+ expect: 'Parallelism level is configured with the correct value'
+ assert objectUnderTest.getAsyncTaskParallelismLevel() == 3
+ }
+
+ def 'Task completion with #caseDescriptor.'() {
+ when: 'task completion is handled'
+ def irrelevantResponse = null
+ objectUnderTest.handleTaskCompletion(irrelevantResponse, exception);
+ then: 'any exception is swallowed by the task completion (logged)'
+ noExceptionThrown()
+ where: 'following cases are tested'
+ caseDescriptor | exception
+ 'no exception' | null
+ 'time out exception' | new TimeoutException("time-out")
+ 'unexpected exception' | new Exception("some exception")
+ }
+
+ def 'Task execution.'() {
+ when: 'a task is submitted for execution'
+ objectUnderTest.executeTask(() -> mockTaskSupplier, 0)
+ then: 'the task submission is successful'
+ noExceptionThrown()
+ }
+
+}
diff --git a/cps-ncmp-service/src/test/resources/application.yml b/cps-ncmp-service/src/test/resources/application.yml
index c23926e4eb..03d70c26c6 100644
--- a/cps-ncmp-service/src/test/resources/application.yml
+++ b/cps-ncmp-service/src/test/resources/application.yml
@@ -23,3 +23,6 @@ dmi:
api:
base-path: dmi
+modules-sync-watchdog:
+ async-executor:
+ parallelism-level: 3 \ No newline at end of file