From 791af2fb0f717a2b58d55a7d16d6fb33060b7205 Mon Sep 17 00:00:00 2001 From: sourabh_sourabh Date: Wed, 31 Aug 2022 11:58:09 +0100 Subject: Performance Improvement: Watchdog Parallel execution with configuration - Introduced AsyncSyncExecutor to get task and execute it with configured number of parallel threads. - Number of parallel thread can be configured from application.yml. - AsyncTaskExecutorSpec is added - Fixed existing grovvy test now async task would be submitted. Issue-ID: CPS-1200 Change-Id: I58c0368b945c90e619c2acfc7458ba58de047484 Signed-off-by: ToineSiebelink Signed-off-by: sourabh_sourabh --- .../api/inventory/sync/ModuleSyncTasksSpec.groovy | 27 ++++++---- .../inventory/sync/ModuleSyncWatchdogSpec.groovy | 34 +++++++----- .../sync/executor/AsyncTaskExecutorSpec.groovy | 61 ++++++++++++++++++++++ 3 files changed, 100 insertions(+), 22 deletions(-) create mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutorSpec.groovy (limited to 'cps-ncmp-service/src/test/groovy/org') diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy index 291ba968f..a2339963e 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy @@ -30,6 +30,7 @@ import org.onap.cps.ncmp.api.inventory.InventoryPersistence import org.onap.cps.ncmp.api.inventory.LockReasonCategory import org.onap.cps.spi.model.DataNode import spock.lang.Specification +import java.util.concurrent.atomic.AtomicInteger class ModuleSyncTasksSpec extends Specification { @@ -41,6 +42,8 @@ class ModuleSyncTasksSpec extends Specification { def mockLcmEventsCmHandleStateHandler = Mock(LcmEventsCmHandleStateHandler) + def batchCount = new AtomicInteger(5) + def objectUnderTest = new ModuleSyncTasks(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService, mockLcmEventsCmHandleStateHandler) def 'Module Sync ADVISED cm handles.'() { @@ -50,15 +53,17 @@ class ModuleSyncTasksSpec extends Specification { and: 'the inventory persistence cm handle returns a ADVISED state for the any handle' mockInventoryPersistence.getCmHandleState(_) >> new CompositeState(cmHandleState: CmHandleState.ADVISED) when: 'module sync poll is executed' - objectUnderTest.performModuleSync([cmHandle1, cmHandle2]) + objectUnderTest.performModuleSync([cmHandle1, cmHandle2], batchCount) then: 'module sync service deletes schemas set of each cm handle if it already exists' 1 * mockModuleSyncService.deleteSchemaSetIfExists('cm-handle-1') 1 * mockModuleSyncService.deleteSchemaSetIfExists('cm-handle-2') and: 'module sync service is invoked for each cm handle' - 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args,'cm-handle-1') } - 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args,'cm-handle-2') } + 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args, 'cm-handle-1') } + 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args, 'cm-handle-2') } and: 'the state handler is called for the both cm handles' 2 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.READY) + and: 'batch count is decremented by one' + assert batchCount.get() == 4 } def 'Module Sync ADVISED cm handle with failure during sync.'() { @@ -70,11 +75,13 @@ class ModuleSyncTasksSpec extends Specification { and: 'module sync service attempts to sync the cm handle and throws an exception' 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(*_) >> { throw new Exception('some exception') } when: 'module sync is executed' - objectUnderTest.performModuleSync([cmHandle]) + objectUnderTest.performModuleSync([cmHandle], batchCount) then: 'update lock reason, details and attempts is invoked' - 1 * mockSyncUtils.updateLockReasonDetailsAndAttempts(cmHandleState, LockReasonCategory.LOCKED_MODULE_SYNC_FAILED ,'some exception') + 1 * mockSyncUtils.updateLockReasonDetailsAndAttempts(cmHandleState, LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, 'some exception') and: 'the state handler is called to update the state to LOCKED' 1 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.LOCKED) + and: 'batch count is decremented by one' + assert batchCount.get() == 4 } def 'Reset failed CM Handles #scenario.'() { @@ -90,14 +97,14 @@ class ModuleSyncTasksSpec extends Specification { then: 'updated to state "ADVISED" from "READY" is called as often as there are cm handles ready for retry' expectedNumberOfInvocationsToSaveCmHandleState * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.ADVISED) where: - scenario | isReadyForRetry || expectedNumberOfInvocationsToSaveCmHandleState - 'retry locked cm handle once' | [true, false] || 1 - 'retry locked cm handle twice' | [true, true] || 2 - 'do not retry locked cm handle' | [false, false] || 0 + scenario | isReadyForRetry || expectedNumberOfInvocationsToSaveCmHandleState + 'retry locked cm handle once' | [true, false] || 1 + 'retry locked cm handle twice' | [true, true] || 2 + 'do not retry locked cm handle' | [false, false] || 0 } def advisedCmHandleAsDataNode(cmHandleId) { - return new DataNode(anchorName:cmHandleId, leaves:['id':cmHandleId, 'cm-handle-state':'ADVISED']) + return new DataNode(anchorName: cmHandleId, leaves: ['id': cmHandleId, 'cm-handle-state': 'ADVISED']) } def assertYamgModelCmHandleArgument(args, expectedCmHandleId) { diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy index 43f492dbd..e5240c0e6 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy @@ -22,7 +22,7 @@ package org.onap.cps.ncmp.api.inventory.sync import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle - +import org.onap.cps.ncmp.api.inventory.sync.executor.AsyncTaskExecutor import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.BlockingQueue import org.onap.cps.spi.model.DataNode @@ -34,28 +34,38 @@ class ModuleSyncWatchdogSpec extends Specification { def static testQueueCapacity = 50 + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE - BlockingQueue moduleSyncWorkQueue = new ArrayBlockingQueue(testQueueCapacity) + def moduleSyncWorkQueue = new ArrayBlockingQueue(testQueueCapacity) def moduleSyncStartedOnCmHandles = [:] def mockModuleSyncTasks = Mock(ModuleSyncTasks) - def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , moduleSyncStartedOnCmHandles, mockModuleSyncTasks) + def spiedAsyncTaskExecutor = Spy(AsyncTaskExecutor) + + def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , moduleSyncStartedOnCmHandles, + mockModuleSyncTasks, spiedAsyncTaskExecutor) + + void setup() { + spiedAsyncTaskExecutor.setupThreadPool(); + } - def 'Module sync #scenario , #numberOfAdvisedCmHandles advised cm handles.'() { + def 'Module sync advised cm handles with #scenario.'() { given: 'sync utilities returns #numberOfAdvisedCmHandles advised cm handles' mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(numberOfAdvisedCmHandles) + and: 'the executor has #parallelismLevel available threads' + spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> parallelismLevel when: ' module sync is started' objectUnderTest.moduleSyncAdvisedCmHandles() then: 'it performs #expectedNumberOfTaskExecutions tasks' - expectedNumberOfTaskExecutions * mockModuleSyncTasks.performModuleSync(_) - where: - scenario | numberOfAdvisedCmHandles || expectedNumberOfTaskExecutions - 'less then 1 batch' | 1 || 1 - 'exactly 1 batch' | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 1 - '2 batches' | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 2 - 'queue capacity' | testQueueCapacity || 3 - 'over queue capacity' | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3 + expectedNumberOfTaskExecutions * spiedAsyncTaskExecutor.executeTask(*_) + where: ' the following parameter are used' + scenario | parallelismLevel | numberOfAdvisedCmHandles || expectedNumberOfTaskExecutions + 'less then 1 batch' | 9 | 1 || 1 + 'exactly 1 batch' | 9 | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 1 + '2 batches' | 9 | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 2 + 'queue capacity' | 9 | testQueueCapacity || 3 + 'over queue capacity' | 9 | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3 + 'not enough threads' | 2 | testQueueCapacity || 2 } def 'Reset failed cm handles.'() { diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutorSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutorSpec.groovy new file mode 100644 index 000000000..ba1820e64 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutorSpec.groovy @@ -0,0 +1,61 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.inventory.sync.executor + +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import spock.lang.Specification +import java.util.concurrent.TimeoutException +import java.util.function.Supplier + +@SpringBootTest(classes = AsyncTaskExecutor) +class AsyncTaskExecutorSpec extends Specification { + + @Autowired + AsyncTaskExecutor objectUnderTest + def mockTaskSupplier = Mock(Supplier) + + def 'Parallelism level configuration.'() { + expect: 'Parallelism level is configured with the correct value' + assert objectUnderTest.getAsyncTaskParallelismLevel() == 3 + } + + def 'Task completion with #caseDescriptor.'() { + when: 'task completion is handled' + def irrelevantResponse = null + objectUnderTest.handleTaskCompletion(irrelevantResponse, exception); + then: 'any exception is swallowed by the task completion (logged)' + noExceptionThrown() + where: 'following cases are tested' + caseDescriptor | exception + 'no exception' | null + 'time out exception' | new TimeoutException("time-out") + 'unexpected exception' | new Exception("some exception") + } + + def 'Task execution.'() { + when: 'a task is submitted for execution' + objectUnderTest.executeTask(() -> mockTaskSupplier, 0) + then: 'the task submission is successful' + noExceptionThrown() + } + +} -- cgit 1.2.3-korg