diff options
Diffstat (limited to 'cps-ncmp-service/src')
7 files changed, 226 insertions, 48 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java index 5e26650eda..597e2ba8e5 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler; @@ -52,28 +53,35 @@ public class ModuleSyncTasks { * Perform module sync on a batch of cm handles. * * @param cmHandlesAsDataNodes a batch of Data nodes representing cm handles to perform module sync on + * @param batchCounter the number of batches currently being processed, will be decreased when task is finished + * or fails * @return completed future to handle post-processing */ - public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes) { - final Map<YangModelCmHandle, CmHandleState> cmHandelStatePerCmHandle = new HashMap<>(); - for (final DataNode cmHandleAsDataNode : cmHandlesAsDataNodes) { - final String cmHandleId = String.valueOf(cmHandleAsDataNode.getLeaves().get("id")); - final YangModelCmHandle yangModelCmHandle = - YangDataConverter.convertCmHandleToYangModel(cmHandleAsDataNode, cmHandleId); - final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId); - try { - moduleSyncService.deleteSchemaSetIfExists(cmHandleId); - moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle); - cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY); - } catch (final Exception e) { - syncUtils.updateLockReasonDetailsAndAttempts(compositeState, - LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage()); - setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason()); - cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.LOCKED); + public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes, + final AtomicInteger batchCounter) { + try { + final Map<YangModelCmHandle, CmHandleState> cmHandelStatePerCmHandle = new HashMap<>(); + for (final DataNode cmHandleAsDataNode : cmHandlesAsDataNodes) { + final String cmHandleId = String.valueOf(cmHandleAsDataNode.getLeaves().get("id")); + final YangModelCmHandle yangModelCmHandle = + YangDataConverter.convertCmHandleToYangModel(cmHandleAsDataNode, cmHandleId); + final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId); + try { + moduleSyncService.deleteSchemaSetIfExists(cmHandleId); + moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle); + cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY); + } catch (final Exception e) { + syncUtils.updateLockReasonDetailsAndAttempts(compositeState, + LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage()); + setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason()); + cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.LOCKED); + } + log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name()); } - log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name()); + updateCmHandlesStateBatch(cmHandelStatePerCmHandle); + } finally { + batchCounter.getAndDecrement(); } - updateCmHandlesStateBatch(cmHandelStatePerCmHandle); return COMPLETED_FUTURE; } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java index 8074fe6fe1..73954c36b3 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java @@ -27,36 +27,50 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; +import org.onap.cps.ncmp.api.inventory.sync.executor.AsyncTaskExecutor; import org.onap.cps.spi.model.DataNode; import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; @Slf4j @RequiredArgsConstructor -@Component +@Service public class ModuleSyncWatchdog { private final SyncUtils syncUtils; private final BlockingQueue<DataNode> moduleSyncWorkQueue; private final Map<String, Object> moduleSyncStartedOnCmHandles; private final ModuleSyncTasks moduleSyncTasks; - + private final AsyncTaskExecutor asyncTaskExecutor; private static final int MODULE_SYNC_BATCH_SIZE = 100; private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10; private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started"; + private static final long ASYNC_TASK_TIMEOUT_IN_MILLISECONDS = TimeUnit.MINUTES.toMillis(5); + @Getter + private AtomicInteger batchCounter = new AtomicInteger(1); /** - * Execute Cm Handle poll which changes the cm handle state from 'ADVISED' to 'READY'. + * Check DB for any cm handles in 'ADVISED' state. + * Queue and create batches to process them asynchronously. + * This method will only finish when there are no more 'ADVISED' cm handles in the DB. + * This method wil be triggered on a configurable interval */ @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:5000}") public void moduleSyncAdvisedCmHandles() { populateWorkQueueIfNeeded(); - while (!moduleSyncWorkQueue.isEmpty()) { + final int asyncTaskParallelismLevel = asyncTaskExecutor.getAsyncTaskParallelismLevel(); + while (!moduleSyncWorkQueue.isEmpty() && batchCounter.get() <= asyncTaskParallelismLevel) { + batchCounter.getAndIncrement(); final Collection<DataNode> nextBatch = prepareNextBatch(); - moduleSyncTasks.performModuleSync(nextBatch); + asyncTaskExecutor.executeTask(() -> + moduleSyncTasks.performModuleSync(nextBatch, batchCounter), + ASYNC_TASK_TIMEOUT_IN_MILLISECONDS + ); preventBusyWait(); } } @@ -71,8 +85,6 @@ public class ModuleSyncWatchdog { } private void preventBusyWait() { - // This method isn't really needed until CPS-1200 Performance Improvement: Watchdog Parallel execution - // but leaving here to minimize impacts on this class for that Jira try { TimeUnit.MILLISECONDS.sleep(PREVENT_CPU_BURN_WAIT_TIME_MILLIS); } catch (final InterruptedException e) { diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutor.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutor.java new file mode 100644 index 0000000000..7b4d2cfaa9 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutor.java @@ -0,0 +1,77 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.inventory.sync.executor; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; +import javax.annotation.PostConstruct; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +public class AsyncTaskExecutor { + + @Value("${modules-sync-watchdog.async-executor.parallelism-level:10}") + @Getter + private int asyncTaskParallelismLevel; + private ExecutorService executorService; + private static final int DEFAULT_PARALLELISM_LEVEL = 10; + + /** + * Set up executor service with thread-pool size as per configuration parameter. + * If modules-sync-watchdog.async-executor.parallelism-level not set a default of 10 threads will be applied. + */ + @PostConstruct + public void setupThreadPool() { + executorService = Executors.newWorkStealingPool( + asyncTaskParallelismLevel == 0 ? DEFAULT_PARALLELISM_LEVEL : asyncTaskParallelismLevel); + } + + /** + * Execute supplied task asynchronously. + * + * @param taskSupplier functional method is get() task need to executed asynchronously + * @param timeOutInMillis the task timeout value in milliseconds + */ + public void executeTask(final Supplier<Object> taskSupplier, final long timeOutInMillis) { + CompletableFuture.supplyAsync(taskSupplier::get, executorService) + .orTimeout(timeOutInMillis, MILLISECONDS) + .whenCompleteAsync(this::handleTaskCompletion); + } + + private void handleTaskCompletion(final Object response, final Throwable throwable) { + if (throwable != null) { + if (throwable instanceof TimeoutException) { + log.warn("Async task didn't completed within the required time."); + } else { + log.debug("Watchdog async batch failed. caused by : {}", throwable.getMessage()); + } + } + } +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy index 291ba968ff..a2339963e3 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy @@ -30,6 +30,7 @@ import org.onap.cps.ncmp.api.inventory.InventoryPersistence import org.onap.cps.ncmp.api.inventory.LockReasonCategory import org.onap.cps.spi.model.DataNode import spock.lang.Specification +import java.util.concurrent.atomic.AtomicInteger class ModuleSyncTasksSpec extends Specification { @@ -41,6 +42,8 @@ class ModuleSyncTasksSpec extends Specification { def mockLcmEventsCmHandleStateHandler = Mock(LcmEventsCmHandleStateHandler) + def batchCount = new AtomicInteger(5) + def objectUnderTest = new ModuleSyncTasks(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService, mockLcmEventsCmHandleStateHandler) def 'Module Sync ADVISED cm handles.'() { @@ -50,15 +53,17 @@ class ModuleSyncTasksSpec extends Specification { and: 'the inventory persistence cm handle returns a ADVISED state for the any handle' mockInventoryPersistence.getCmHandleState(_) >> new CompositeState(cmHandleState: CmHandleState.ADVISED) when: 'module sync poll is executed' - objectUnderTest.performModuleSync([cmHandle1, cmHandle2]) + objectUnderTest.performModuleSync([cmHandle1, cmHandle2], batchCount) then: 'module sync service deletes schemas set of each cm handle if it already exists' 1 * mockModuleSyncService.deleteSchemaSetIfExists('cm-handle-1') 1 * mockModuleSyncService.deleteSchemaSetIfExists('cm-handle-2') and: 'module sync service is invoked for each cm handle' - 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args,'cm-handle-1') } - 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args,'cm-handle-2') } + 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args, 'cm-handle-1') } + 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args, 'cm-handle-2') } and: 'the state handler is called for the both cm handles' 2 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.READY) + and: 'batch count is decremented by one' + assert batchCount.get() == 4 } def 'Module Sync ADVISED cm handle with failure during sync.'() { @@ -70,11 +75,13 @@ class ModuleSyncTasksSpec extends Specification { and: 'module sync service attempts to sync the cm handle and throws an exception' 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(*_) >> { throw new Exception('some exception') } when: 'module sync is executed' - objectUnderTest.performModuleSync([cmHandle]) + objectUnderTest.performModuleSync([cmHandle], batchCount) then: 'update lock reason, details and attempts is invoked' - 1 * mockSyncUtils.updateLockReasonDetailsAndAttempts(cmHandleState, LockReasonCategory.LOCKED_MODULE_SYNC_FAILED ,'some exception') + 1 * mockSyncUtils.updateLockReasonDetailsAndAttempts(cmHandleState, LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, 'some exception') and: 'the state handler is called to update the state to LOCKED' 1 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.LOCKED) + and: 'batch count is decremented by one' + assert batchCount.get() == 4 } def 'Reset failed CM Handles #scenario.'() { @@ -90,14 +97,14 @@ class ModuleSyncTasksSpec extends Specification { then: 'updated to state "ADVISED" from "READY" is called as often as there are cm handles ready for retry' expectedNumberOfInvocationsToSaveCmHandleState * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.ADVISED) where: - scenario | isReadyForRetry || expectedNumberOfInvocationsToSaveCmHandleState - 'retry locked cm handle once' | [true, false] || 1 - 'retry locked cm handle twice' | [true, true] || 2 - 'do not retry locked cm handle' | [false, false] || 0 + scenario | isReadyForRetry || expectedNumberOfInvocationsToSaveCmHandleState + 'retry locked cm handle once' | [true, false] || 1 + 'retry locked cm handle twice' | [true, true] || 2 + 'do not retry locked cm handle' | [false, false] || 0 } def advisedCmHandleAsDataNode(cmHandleId) { - return new DataNode(anchorName:cmHandleId, leaves:['id':cmHandleId, 'cm-handle-state':'ADVISED']) + return new DataNode(anchorName: cmHandleId, leaves: ['id': cmHandleId, 'cm-handle-state': 'ADVISED']) } def assertYamgModelCmHandleArgument(args, expectedCmHandleId) { diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy index 43f492dbd7..e5240c0e66 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy @@ -22,7 +22,7 @@ package org.onap.cps.ncmp.api.inventory.sync import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle - +import org.onap.cps.ncmp.api.inventory.sync.executor.AsyncTaskExecutor import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.BlockingQueue import org.onap.cps.spi.model.DataNode @@ -34,28 +34,38 @@ class ModuleSyncWatchdogSpec extends Specification { def static testQueueCapacity = 50 + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE - BlockingQueue<DataNode> moduleSyncWorkQueue = new ArrayBlockingQueue(testQueueCapacity) + def moduleSyncWorkQueue = new ArrayBlockingQueue(testQueueCapacity) def moduleSyncStartedOnCmHandles = [:] def mockModuleSyncTasks = Mock(ModuleSyncTasks) - def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , moduleSyncStartedOnCmHandles, mockModuleSyncTasks) + def spiedAsyncTaskExecutor = Spy(AsyncTaskExecutor) + + def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , moduleSyncStartedOnCmHandles, + mockModuleSyncTasks, spiedAsyncTaskExecutor) + + void setup() { + spiedAsyncTaskExecutor.setupThreadPool(); + } - def 'Module sync #scenario , #numberOfAdvisedCmHandles advised cm handles.'() { + def 'Module sync advised cm handles with #scenario.'() { given: 'sync utilities returns #numberOfAdvisedCmHandles advised cm handles' mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(numberOfAdvisedCmHandles) + and: 'the executor has #parallelismLevel available threads' + spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> parallelismLevel when: ' module sync is started' objectUnderTest.moduleSyncAdvisedCmHandles() then: 'it performs #expectedNumberOfTaskExecutions tasks' - expectedNumberOfTaskExecutions * mockModuleSyncTasks.performModuleSync(_) - where: - scenario | numberOfAdvisedCmHandles || expectedNumberOfTaskExecutions - 'less then 1 batch' | 1 || 1 - 'exactly 1 batch' | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 1 - '2 batches' | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 2 - 'queue capacity' | testQueueCapacity || 3 - 'over queue capacity' | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3 + expectedNumberOfTaskExecutions * spiedAsyncTaskExecutor.executeTask(*_) + where: ' the following parameter are used' + scenario | parallelismLevel | numberOfAdvisedCmHandles || expectedNumberOfTaskExecutions + 'less then 1 batch' | 9 | 1 || 1 + 'exactly 1 batch' | 9 | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 1 + '2 batches' | 9 | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 2 + 'queue capacity' | 9 | testQueueCapacity || 3 + 'over queue capacity' | 9 | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3 + 'not enough threads' | 2 | testQueueCapacity || 2 } def 'Reset failed cm handles.'() { diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutorSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutorSpec.groovy new file mode 100644 index 0000000000..ba1820e645 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutorSpec.groovy @@ -0,0 +1,61 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.inventory.sync.executor + +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import spock.lang.Specification +import java.util.concurrent.TimeoutException +import java.util.function.Supplier + +@SpringBootTest(classes = AsyncTaskExecutor) +class AsyncTaskExecutorSpec extends Specification { + + @Autowired + AsyncTaskExecutor objectUnderTest + def mockTaskSupplier = Mock(Supplier<Object>) + + def 'Parallelism level configuration.'() { + expect: 'Parallelism level is configured with the correct value' + assert objectUnderTest.getAsyncTaskParallelismLevel() == 3 + } + + def 'Task completion with #caseDescriptor.'() { + when: 'task completion is handled' + def irrelevantResponse = null + objectUnderTest.handleTaskCompletion(irrelevantResponse, exception); + then: 'any exception is swallowed by the task completion (logged)' + noExceptionThrown() + where: 'following cases are tested' + caseDescriptor | exception + 'no exception' | null + 'time out exception' | new TimeoutException("time-out") + 'unexpected exception' | new Exception("some exception") + } + + def 'Task execution.'() { + when: 'a task is submitted for execution' + objectUnderTest.executeTask(() -> mockTaskSupplier, 0) + then: 'the task submission is successful' + noExceptionThrown() + } + +} diff --git a/cps-ncmp-service/src/test/resources/application.yml b/cps-ncmp-service/src/test/resources/application.yml index c23926e4eb..03d70c26c6 100644 --- a/cps-ncmp-service/src/test/resources/application.yml +++ b/cps-ncmp-service/src/test/resources/application.yml @@ -23,3 +23,6 @@ dmi: api: base-path: dmi +modules-sync-watchdog: + async-executor: + parallelism-level: 3
\ No newline at end of file |