diff options
9 files changed, 153 insertions, 37 deletions
diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml index def006c9ae..dd83795f74 100644 --- a/cps-application/src/main/resources/application.yml +++ b/cps-application/src/main/resources/application.yml @@ -100,8 +100,8 @@ notification: enabled-dataspaces: ${NOTIFICATION_DATASPACE_FILTER_PATTERNS:""}
async:
executor:
- core-pool-size: 2
- max-pool-size: 10
+ core-pool-size: 10
+ max-pool-size: 100
queue-capacity: 500
wait-for-tasks-to-complete-on-shutdown: true
thread-name-prefix: Async-
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/NcmpConfiguration.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/NcmpConfiguration.java index af33651ebe..a41815554f 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/NcmpConfiguration.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/NcmpConfiguration.java @@ -32,11 +32,9 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; import org.springframework.http.MediaType; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; -import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; -@EnableScheduling @Configuration @RequiredArgsConstructor(access = AccessLevel.PROTECTED) public class NcmpConfiguration { diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java index f18d843f81..3f81194fe1 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java @@ -54,8 +54,7 @@ public class ModuleSyncWatchdog { */ @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:30000}") public void executeAdvisedCmHandlePoll() { - YangModelCmHandle advisedCmHandle = syncUtils.getAnAdvisedCmHandle(); - while (advisedCmHandle != null) { + syncUtils.getAdvisedCmHandles().stream().forEach(advisedCmHandle -> { final String cmHandleId = advisedCmHandle.getId(); final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId); try { @@ -69,8 +68,7 @@ public class ModuleSyncWatchdog { } inventoryPersistence.saveCmHandleState(cmHandleId, compositeState); log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name()); - advisedCmHandle = syncUtils.getAnAdvisedCmHandle(); - } + }); log.debug("No Cm-Handles currently found in an ADVISED state"); } @@ -85,7 +83,7 @@ public class ModuleSyncWatchdog { final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState); if (isReadyForRetry) { setCompositeStateToAdvisedAndRetainOldLockReasonDetails(compositeState); - log.debug("Locked cm handle {} is being resynced", lockedCmHandle.getId()); + log.debug("Locked cm handle {} is being re-synced", lockedCmHandle.getId()); inventoryPersistence.saveCmHandleState(lockedCmHandle.getId(), compositeState); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java index 2b80b9d53d..467fd8f608 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java @@ -22,10 +22,10 @@ package org.onap.cps.ncmp.api.inventory.sync; import com.fasterxml.jackson.databind.JsonNode; -import java.security.SecureRandom; import java.time.Duration; import java.time.OffsetDateTime; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -55,9 +55,6 @@ import org.springframework.stereotype.Service; @Service @RequiredArgsConstructor public class SyncUtils { - - private static final SecureRandom secureRandom = new SecureRandom(); - private final InventoryPersistence inventoryPersistence; private final DmiDataOperations dmiDataOperations; @@ -69,17 +66,17 @@ public class SyncUtils { /** * Query data nodes for cm handles with an "ADVISED" cm handle state, and select a random entry for processing. * - * @return a random yang model cm handle with an ADVISED state, return null if not found + * @return a randomized yang model cm handle list with ADVISED state, return empty list if not found */ - public YangModelCmHandle getAnAdvisedCmHandle() { - final List<DataNode> advisedCmHandles = inventoryPersistence.getCmHandlesByState(CmHandleState.ADVISED); - if (advisedCmHandles.isEmpty()) { - return null; + public List<YangModelCmHandle> getAdvisedCmHandles() { + final List<DataNode> advisedCmHandlesAsDataNodeList = new ArrayList<>( + inventoryPersistence.getCmHandlesByState(CmHandleState.ADVISED)); + log.info("Total number of fetched advised cm handle(s) is (are) {}", advisedCmHandlesAsDataNodeList.size()); + if (advisedCmHandlesAsDataNodeList.isEmpty()) { + return Collections.emptyList(); } - final int randomElementIndex = secureRandom.nextInt(advisedCmHandles.size()); - final String cmHandleId = advisedCmHandles.get(randomElementIndex).getLeaves() - .get("id").toString(); - return inventoryPersistence.getYangModelCmHandle(cmHandleId); + Collections.shuffle(advisedCmHandlesAsDataNodeList); + return convertCmHandlesDataNodesToYangModelCmHandles(advisedCmHandlesAsDataNodeList); } /** @@ -113,12 +110,10 @@ public class SyncUtils { * @return a random LOCKED yang model cm handle, return null if not found */ public List<YangModelCmHandle> getModuleSyncFailedCmHandles() { - final List<DataNode> lockedCmHandleAsDataNodeList = inventoryPersistence.getCmHandleDataNodesByCpsPath( + final List<DataNode> lockedCmHandlesAsDataNodeList = inventoryPersistence.getCmHandleDataNodesByCpsPath( "//lock-reason[@reason=\"LOCKED_MODULE_SYNC_FAILED\"]/ancestor::cm-handles", FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS); - return lockedCmHandleAsDataNodeList.stream() - .map(cmHandle -> YangDataConverter.convertCmHandleToYangModel(cmHandle, - cmHandle.getLeaves().get("id").toString())).collect(Collectors.toList()); + return convertCmHandlesDataNodesToYangModelCmHandles(lockedCmHandlesAsDataNodeList); } /** @@ -191,4 +186,10 @@ public class SyncUtils { final Map.Entry<String, JsonNode> firstElement = overallJsonTreeMap.next(); return jsonObjectMapper.asJsonString(Map.of(firstElement.getKey(), firstElement.getValue())); } + + private List<YangModelCmHandle> convertCmHandlesDataNodesToYangModelCmHandles( + final List<DataNode> cmHandlesAsDataNodeList) { + return cmHandlesAsDataNodeList.stream().map(dataNode -> YangDataConverter.convertCmHandleToYangModel(dataNode, + dataNode.getLeaves().get("id").toString())).collect(Collectors.toList()); + } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/config/WatchdogSchedulingConfigurer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/config/WatchdogSchedulingConfigurer.java new file mode 100644 index 0000000000..196a655ca4 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/config/WatchdogSchedulingConfigurer.java @@ -0,0 +1,56 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.inventory.sync.config; + +import java.util.concurrent.ThreadPoolExecutor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.SchedulingConfigurer; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.scheduling.config.ScheduledTaskRegistrar; + +@Configuration +@EnableScheduling +public class WatchdogSchedulingConfigurer implements SchedulingConfigurer { + + @Override + public void configureTasks(final ScheduledTaskRegistrar scheduledTaskRegistrar) { + scheduledTaskRegistrar.setTaskScheduler(taskScheduler()); + } + + /** + * Implementation of Spring's {@link TaskScheduler} interface, wrapping + * a native {@link org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler} for watchdogs. + */ + @Bean + public TaskScheduler taskScheduler() { + final ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); + taskScheduler.setPoolSize(10); + taskScheduler.setThreadNamePrefix("watchdog-th-"); + taskScheduler.setAwaitTerminationSeconds(60); + taskScheduler.setWaitForTasksToCompleteOnShutdown(true); + taskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); + taskScheduler.initialize(); + return taskScheduler; + } +}
\ No newline at end of file diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy index 4b92be37ab..40a0e39b9b 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy @@ -50,7 +50,7 @@ class ModuleSyncWatchdogSpec extends Specification { def yangModelCmHandle2 = new YangModelCmHandle(id: 'some-cm-handle-2', compositeState: compositeState2) objectUnderTest.isGlobalDataSyncCacheEnabled = dataSyncCacheEnabled and: 'sync utilities return a cm handle twice' - mockSyncUtils.getAnAdvisedCmHandle() >>> [yangModelCmHandle1, yangModelCmHandle2, null] + mockSyncUtils.getAdvisedCmHandles() >> [yangModelCmHandle1, yangModelCmHandle2] when: 'module sync poll is executed' objectUnderTest.executeAdvisedCmHandlePoll() then: 'the inventory persistence cm handle returns a composite state for the first cm handle' @@ -84,7 +84,7 @@ class ModuleSyncWatchdogSpec extends Specification { def compositeState = new CompositeState(cmHandleState: cmHandleState) def yangModelCmHandle = new YangModelCmHandle(id: 'some-cm-handle', compositeState: compositeState) and: 'sync utilities return a cm handle' - mockSyncUtils.getAnAdvisedCmHandle() >>> [yangModelCmHandle, null] + mockSyncUtils.getAdvisedCmHandles() >> [yangModelCmHandle] when: 'module sync poll is executed' objectUnderTest.executeAdvisedCmHandlePoll() then: 'the inventory persistence cm handle returns a composite state for the cm handle' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy index 134ee38da7..6c2d8f15b3 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy @@ -41,6 +41,7 @@ import spock.lang.Specification import java.time.OffsetDateTime import java.time.format.DateTimeFormatter +import java.util.stream.Collectors class SyncUtilsSpec extends Specification{ @@ -61,17 +62,17 @@ class SyncUtilsSpec extends Specification{ def 'Get an advised Cm-Handle where ADVISED cm handle #scenario'() { given: 'the inventory persistence service returns a collection of data nodes' mockInventoryPersistence.getCmHandlesByState(CmHandleState.ADVISED) >> dataNodeCollection - when: 'get advised cm handle is called' - objectUnderTest.getAnAdvisedCmHandle() + when: 'get advised cm handles are fetched' + def yangModelCmHandles = objectUnderTest.getAdvisedCmHandles() then: 'the returned data node collection is the correct size' - dataNodeCollection.size() == expectedDataNodeSize - and: 'get yang model cm handles is invoked the correct number of times' - expectedCallsToGetYangModelCmHandle * mockInventoryPersistence.getYangModelCmHandle('cm-handle-123') + yangModelCmHandles.size() == expectedDataNodeSize + and: 'yang model collection contains the correct data' + yangModelCmHandles.stream().map(yangModel -> yangModel.id).collect(Collectors.toSet()) == + dataNodeCollection.stream().map(dataNode -> dataNode.leaves.get("id")).collect(Collectors.toSet()) where: 'the following scenarios are used' scenario | dataNodeCollection || expectedCallsToGetYangModelCmHandle | expectedDataNodeSize - 'exists' | [ dataNode ] || 1 | 1 - 'does not exist' | [ ] || 0 | 0 - + 'exists' | [dataNode] || 1 | 1 + 'does not exist' | [] || 0 | 0 } def 'Update Lock Reason, Details and Attempts where lock reason #scenario'() { @@ -120,7 +121,7 @@ class SyncUtilsSpec extends Specification{ given: 'the inventory persistence service returns a collection of data nodes' mockInventoryPersistence.getCmHandlesByOperationalSyncState(DataStoreSyncState.UNSYNCHRONIZED) >> unSynchronizedDataNodes mockInventoryPersistence.getCmHandlesByIdAndState("cm-handle-123", CmHandleState.READY) >> readyDataNodes - when: 'get advised cm handle is called' + when: 'get advised cm handles are fetched' objectUnderTest.getAnUnSynchronizedReadyCmHandle() then: 'the returned data node collection is the correct size' readyDataNodes.size() == expectedDataNodeSize diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/config/WatchdogSchedulingConfigurerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/config/WatchdogSchedulingConfigurerSpec.groovy new file mode 100644 index 0000000000..d4010aa781 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/config/WatchdogSchedulingConfigurerSpec.groovy @@ -0,0 +1,58 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.inventory.sync.config + +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.context.ConfigurableApplicationContext +import org.springframework.test.context.ContextConfiguration +import spock.lang.Specification + +@SpringBootTest +@ContextConfiguration(classes = [ConfigurableApplicationContext, WatchdogSchedulingConfigurer]) +class WatchdogSchedulingConfigurerSpec extends Specification { + + @Autowired + private ConfigurableApplicationContext applicationContext; + + def watchdogSchedulingConfigurer; + + @BeforeEach + void setup() { + watchdogSchedulingConfigurer = (WatchdogSchedulingConfigurer) applicationContext.getBean("watchdogSchedulingConfigurer") + } + + @AfterEach + void tearDown() { + if (applicationContext != null) { + applicationContext.close() + } + } + + def 'Validate watchdog scheduling configuration'() { + given: 'task scheduler configuration properties are loaded as map' + def linkedHashMap = watchdogSchedulingConfigurer.taskScheduler().getProperties() + expect: 'thread name prefix is mapped correctly' + assert linkedHashMap.'threadNamePrefix' == 'watchdog-th-' + } +} diff --git a/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java b/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java index 2d8f7fb08b..9327c53451 100644 --- a/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java +++ b/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java @@ -21,6 +21,7 @@ package org.onap.cps.config; +import java.util.concurrent.ThreadPoolExecutor; import javax.validation.constraints.Min; import lombok.Setter; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -61,7 +62,10 @@ public class AsyncConfig { executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown); + executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix(threadNamePrefix); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); + executor.initialize(); return executor; } |