summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cps-application/src/main/resources/application.yml4
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/NcmpConfiguration.java2
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java8
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java35
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/config/WatchdogSchedulingConfigurer.java56
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy4
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy19
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/config/WatchdogSchedulingConfigurerSpec.groovy58
-rw-r--r--cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java4
9 files changed, 153 insertions, 37 deletions
diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml
index def006c9ae..dd83795f74 100644
--- a/cps-application/src/main/resources/application.yml
+++ b/cps-application/src/main/resources/application.yml
@@ -100,8 +100,8 @@ notification:
enabled-dataspaces: ${NOTIFICATION_DATASPACE_FILTER_PATTERNS:""}
async:
executor:
- core-pool-size: 2
- max-pool-size: 10
+ core-pool-size: 10
+ max-pool-size: 100
queue-capacity: 500
wait-for-tasks-to-complete-on-shutdown: true
thread-name-prefix: Async-
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/NcmpConfiguration.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/NcmpConfiguration.java
index af33651ebe..a41815554f 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/NcmpConfiguration.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/NcmpConfiguration.java
@@ -32,11 +32,9 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.http.MediaType;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
-import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
-@EnableScheduling
@Configuration
@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
public class NcmpConfiguration {
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
index f18d843f81..3f81194fe1 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
@@ -54,8 +54,7 @@ public class ModuleSyncWatchdog {
*/
@Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:30000}")
public void executeAdvisedCmHandlePoll() {
- YangModelCmHandle advisedCmHandle = syncUtils.getAnAdvisedCmHandle();
- while (advisedCmHandle != null) {
+ syncUtils.getAdvisedCmHandles().stream().forEach(advisedCmHandle -> {
final String cmHandleId = advisedCmHandle.getId();
final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
try {
@@ -69,8 +68,7 @@ public class ModuleSyncWatchdog {
}
inventoryPersistence.saveCmHandleState(cmHandleId, compositeState);
log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
- advisedCmHandle = syncUtils.getAnAdvisedCmHandle();
- }
+ });
log.debug("No Cm-Handles currently found in an ADVISED state");
}
@@ -85,7 +83,7 @@ public class ModuleSyncWatchdog {
final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState);
if (isReadyForRetry) {
setCompositeStateToAdvisedAndRetainOldLockReasonDetails(compositeState);
- log.debug("Locked cm handle {} is being resynced", lockedCmHandle.getId());
+ log.debug("Locked cm handle {} is being re-synced", lockedCmHandle.getId());
inventoryPersistence.saveCmHandleState(lockedCmHandle.getId(), compositeState);
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java
index 2b80b9d53d..467fd8f608 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java
@@ -22,10 +22,10 @@
package org.onap.cps.ncmp.api.inventory.sync;
import com.fasterxml.jackson.databind.JsonNode;
-import java.security.SecureRandom;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -55,9 +55,6 @@ import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class SyncUtils {
-
- private static final SecureRandom secureRandom = new SecureRandom();
-
private final InventoryPersistence inventoryPersistence;
private final DmiDataOperations dmiDataOperations;
@@ -69,17 +66,17 @@ public class SyncUtils {
/**
* Query data nodes for cm handles with an "ADVISED" cm handle state, and select a random entry for processing.
*
- * @return a random yang model cm handle with an ADVISED state, return null if not found
+ * @return a randomized yang model cm handle list with ADVISED state, return empty list if not found
*/
- public YangModelCmHandle getAnAdvisedCmHandle() {
- final List<DataNode> advisedCmHandles = inventoryPersistence.getCmHandlesByState(CmHandleState.ADVISED);
- if (advisedCmHandles.isEmpty()) {
- return null;
+ public List<YangModelCmHandle> getAdvisedCmHandles() {
+ final List<DataNode> advisedCmHandlesAsDataNodeList = new ArrayList<>(
+ inventoryPersistence.getCmHandlesByState(CmHandleState.ADVISED));
+ log.info("Total number of fetched advised cm handle(s) is (are) {}", advisedCmHandlesAsDataNodeList.size());
+ if (advisedCmHandlesAsDataNodeList.isEmpty()) {
+ return Collections.emptyList();
}
- final int randomElementIndex = secureRandom.nextInt(advisedCmHandles.size());
- final String cmHandleId = advisedCmHandles.get(randomElementIndex).getLeaves()
- .get("id").toString();
- return inventoryPersistence.getYangModelCmHandle(cmHandleId);
+ Collections.shuffle(advisedCmHandlesAsDataNodeList);
+ return convertCmHandlesDataNodesToYangModelCmHandles(advisedCmHandlesAsDataNodeList);
}
/**
@@ -113,12 +110,10 @@ public class SyncUtils {
* @return a random LOCKED yang model cm handle, return null if not found
*/
public List<YangModelCmHandle> getModuleSyncFailedCmHandles() {
- final List<DataNode> lockedCmHandleAsDataNodeList = inventoryPersistence.getCmHandleDataNodesByCpsPath(
+ final List<DataNode> lockedCmHandlesAsDataNodeList = inventoryPersistence.getCmHandleDataNodesByCpsPath(
"//lock-reason[@reason=\"LOCKED_MODULE_SYNC_FAILED\"]/ancestor::cm-handles",
FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS);
- return lockedCmHandleAsDataNodeList.stream()
- .map(cmHandle -> YangDataConverter.convertCmHandleToYangModel(cmHandle,
- cmHandle.getLeaves().get("id").toString())).collect(Collectors.toList());
+ return convertCmHandlesDataNodesToYangModelCmHandles(lockedCmHandlesAsDataNodeList);
}
/**
@@ -191,4 +186,10 @@ public class SyncUtils {
final Map.Entry<String, JsonNode> firstElement = overallJsonTreeMap.next();
return jsonObjectMapper.asJsonString(Map.of(firstElement.getKey(), firstElement.getValue()));
}
+
+ private List<YangModelCmHandle> convertCmHandlesDataNodesToYangModelCmHandles(
+ final List<DataNode> cmHandlesAsDataNodeList) {
+ return cmHandlesAsDataNodeList.stream().map(dataNode -> YangDataConverter.convertCmHandleToYangModel(dataNode,
+ dataNode.getLeaves().get("id").toString())).collect(Collectors.toList());
+ }
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/config/WatchdogSchedulingConfigurer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/config/WatchdogSchedulingConfigurer.java
new file mode 100644
index 0000000000..196a655ca4
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/config/WatchdogSchedulingConfigurer.java
@@ -0,0 +1,56 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.inventory.sync.config;
+
+import java.util.concurrent.ThreadPoolExecutor;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.SchedulingConfigurer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+import org.springframework.scheduling.config.ScheduledTaskRegistrar;
+
+@Configuration
+@EnableScheduling
+public class WatchdogSchedulingConfigurer implements SchedulingConfigurer {
+
+ @Override
+ public void configureTasks(final ScheduledTaskRegistrar scheduledTaskRegistrar) {
+ scheduledTaskRegistrar.setTaskScheduler(taskScheduler());
+ }
+
+ /**
+ * Implementation of Spring's {@link TaskScheduler} interface, wrapping
+ * a native {@link org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler} for watchdogs.
+ */
+ @Bean
+ public TaskScheduler taskScheduler() {
+ final ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
+ taskScheduler.setPoolSize(10);
+ taskScheduler.setThreadNamePrefix("watchdog-th-");
+ taskScheduler.setAwaitTerminationSeconds(60);
+ taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
+ taskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
+ taskScheduler.initialize();
+ return taskScheduler;
+ }
+} \ No newline at end of file
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
index 4b92be37ab..40a0e39b9b 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
@@ -50,7 +50,7 @@ class ModuleSyncWatchdogSpec extends Specification {
def yangModelCmHandle2 = new YangModelCmHandle(id: 'some-cm-handle-2', compositeState: compositeState2)
objectUnderTest.isGlobalDataSyncCacheEnabled = dataSyncCacheEnabled
and: 'sync utilities return a cm handle twice'
- mockSyncUtils.getAnAdvisedCmHandle() >>> [yangModelCmHandle1, yangModelCmHandle2, null]
+ mockSyncUtils.getAdvisedCmHandles() >> [yangModelCmHandle1, yangModelCmHandle2]
when: 'module sync poll is executed'
objectUnderTest.executeAdvisedCmHandlePoll()
then: 'the inventory persistence cm handle returns a composite state for the first cm handle'
@@ -84,7 +84,7 @@ class ModuleSyncWatchdogSpec extends Specification {
def compositeState = new CompositeState(cmHandleState: cmHandleState)
def yangModelCmHandle = new YangModelCmHandle(id: 'some-cm-handle', compositeState: compositeState)
and: 'sync utilities return a cm handle'
- mockSyncUtils.getAnAdvisedCmHandle() >>> [yangModelCmHandle, null]
+ mockSyncUtils.getAdvisedCmHandles() >> [yangModelCmHandle]
when: 'module sync poll is executed'
objectUnderTest.executeAdvisedCmHandlePoll()
then: 'the inventory persistence cm handle returns a composite state for the cm handle'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy
index 134ee38da7..6c2d8f15b3 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy
@@ -41,6 +41,7 @@ import spock.lang.Specification
import java.time.OffsetDateTime
import java.time.format.DateTimeFormatter
+import java.util.stream.Collectors
class SyncUtilsSpec extends Specification{
@@ -61,17 +62,17 @@ class SyncUtilsSpec extends Specification{
def 'Get an advised Cm-Handle where ADVISED cm handle #scenario'() {
given: 'the inventory persistence service returns a collection of data nodes'
mockInventoryPersistence.getCmHandlesByState(CmHandleState.ADVISED) >> dataNodeCollection
- when: 'get advised cm handle is called'
- objectUnderTest.getAnAdvisedCmHandle()
+ when: 'get advised cm handles are fetched'
+ def yangModelCmHandles = objectUnderTest.getAdvisedCmHandles()
then: 'the returned data node collection is the correct size'
- dataNodeCollection.size() == expectedDataNodeSize
- and: 'get yang model cm handles is invoked the correct number of times'
- expectedCallsToGetYangModelCmHandle * mockInventoryPersistence.getYangModelCmHandle('cm-handle-123')
+ yangModelCmHandles.size() == expectedDataNodeSize
+ and: 'yang model collection contains the correct data'
+ yangModelCmHandles.stream().map(yangModel -> yangModel.id).collect(Collectors.toSet()) ==
+ dataNodeCollection.stream().map(dataNode -> dataNode.leaves.get("id")).collect(Collectors.toSet())
where: 'the following scenarios are used'
scenario | dataNodeCollection || expectedCallsToGetYangModelCmHandle | expectedDataNodeSize
- 'exists' | [ dataNode ] || 1 | 1
- 'does not exist' | [ ] || 0 | 0
-
+ 'exists' | [dataNode] || 1 | 1
+ 'does not exist' | [] || 0 | 0
}
def 'Update Lock Reason, Details and Attempts where lock reason #scenario'() {
@@ -120,7 +121,7 @@ class SyncUtilsSpec extends Specification{
given: 'the inventory persistence service returns a collection of data nodes'
mockInventoryPersistence.getCmHandlesByOperationalSyncState(DataStoreSyncState.UNSYNCHRONIZED) >> unSynchronizedDataNodes
mockInventoryPersistence.getCmHandlesByIdAndState("cm-handle-123", CmHandleState.READY) >> readyDataNodes
- when: 'get advised cm handle is called'
+ when: 'get advised cm handles are fetched'
objectUnderTest.getAnUnSynchronizedReadyCmHandle()
then: 'the returned data node collection is the correct size'
readyDataNodes.size() == expectedDataNodeSize
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/config/WatchdogSchedulingConfigurerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/config/WatchdogSchedulingConfigurerSpec.groovy
new file mode 100644
index 0000000000..d4010aa781
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/config/WatchdogSchedulingConfigurerSpec.groovy
@@ -0,0 +1,58 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.inventory.sync.config
+
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.BeforeEach
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.context.ConfigurableApplicationContext
+import org.springframework.test.context.ContextConfiguration
+import spock.lang.Specification
+
+@SpringBootTest
+@ContextConfiguration(classes = [ConfigurableApplicationContext, WatchdogSchedulingConfigurer])
+class WatchdogSchedulingConfigurerSpec extends Specification {
+
+ @Autowired
+ private ConfigurableApplicationContext applicationContext;
+
+ def watchdogSchedulingConfigurer;
+
+ @BeforeEach
+ void setup() {
+ watchdogSchedulingConfigurer = (WatchdogSchedulingConfigurer) applicationContext.getBean("watchdogSchedulingConfigurer")
+ }
+
+ @AfterEach
+ void tearDown() {
+ if (applicationContext != null) {
+ applicationContext.close()
+ }
+ }
+
+ def 'Validate watchdog scheduling configuration'() {
+ given: 'task scheduler configuration properties are loaded as map'
+ def linkedHashMap = watchdogSchedulingConfigurer.taskScheduler().getProperties()
+ expect: 'thread name prefix is mapped correctly'
+ assert linkedHashMap.'threadNamePrefix' == 'watchdog-th-'
+ }
+}
diff --git a/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java b/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java
index 2d8f7fb08b..9327c53451 100644
--- a/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java
+++ b/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java
@@ -21,6 +21,7 @@
package org.onap.cps.config;
+import java.util.concurrent.ThreadPoolExecutor;
import javax.validation.constraints.Min;
import lombok.Setter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -61,7 +62,10 @@ public class AsyncConfig {
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown);
+ executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix(threadNamePrefix);
+ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
+ executor.initialize();
return executor;
}