aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cps-application/src/main/resources/application.yml5
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java77
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java11
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java38
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/trustlevel/TrustLevelManager.java10
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java35
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/utils/events/CmAvcEventPublisher.java14
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutorSpec.groovy63
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy18
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy49
-rw-r--r--cps-ncmp-service/src/test/resources/application.yml4
-rw-r--r--csit/tests/cps-trust-level/cps-trust-level.robot12
-rw-r--r--docs/deployment.rst16
-rw-r--r--integration-test/src/test/resources/application.yml5
14 files changed, 54 insertions, 303 deletions
diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml
index 6b9c694cf2..6eb9e108be 100644
--- a/cps-application/src/main/resources/application.yml
+++ b/cps-application/src/main/resources/application.yml
@@ -106,6 +106,7 @@ app:
cm-subscription-dmi-out: ${CM_SUBSCRIPTION_DMI_OUT_TOPIC:dmi-ncmp-cm-avc-subscription}
cm-subscription-ncmp-out: ${CM_SUBSCRIPTION_NCMP_OUT_TOPIC:subscription-response}
cm-events-topic: ${NCMP_CM_EVENTS_TOPIC:cm-events}
+ inventory-events-topic: ncmp-inventory-events
lcm:
events:
topic: ${LCM_EVENTS_TOPIC:ncmp-events}
@@ -247,10 +248,6 @@ ncmp:
trust-level:
dmi-availability-watchdog-ms: 30000
- modules-sync-watchdog:
- async-executor:
- parallelism-level: 10
-
model-loader:
maximum-attempt-count: 20
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java
deleted file mode 100644
index 80bc4ab69f..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2022-2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.inventory.sync;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-import jakarta.annotation.PostConstruct;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Supplier;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Service;
-
-@Slf4j
-@Service
-public class AsyncTaskExecutor {
-
- @Value("${ncmp.modules-sync-watchdog.async-executor.parallelism-level:10}")
- @Getter
- private int asyncTaskParallelismLevel;
- private ExecutorService executorService;
- private static final int DEFAULT_PARALLELISM_LEVEL = 10;
-
- /**
- * Set up executor service with thread-pool size as per configuration parameter.
- * If modules-sync-watchdog.async-executor.parallelism-level not set a default of 10 threads will be applied.
- */
- @PostConstruct
- public void setupThreadPool() {
- executorService = Executors.newWorkStealingPool(
- asyncTaskParallelismLevel == 0 ? DEFAULT_PARALLELISM_LEVEL : asyncTaskParallelismLevel);
- }
-
- /**
- * Execute supplied task asynchronously.
- *
- * @param taskSupplier functional method is get() task need to executed asynchronously
- * @param timeOutInMillis the task timeout value in milliseconds
- */
- public void executeTask(final Supplier<Object> taskSupplier, final long timeOutInMillis) {
- CompletableFuture.supplyAsync(taskSupplier::get, executorService)
- .orTimeout(timeOutInMillis, MILLISECONDS)
- .whenCompleteAsync(this::handleTaskCompletion);
- }
-
- private void handleTaskCompletion(final Object response, final Throwable throwable) {
- if (throwable != null) {
- if (throwable instanceof TimeoutException) {
- log.error("Async task didn't complete within the required time.", throwable);
- } else {
- log.error("Watchdog async batch failed.", throwable);
- }
- }
- }
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
index f039cf3c02..b727e79e70 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
@@ -24,8 +24,6 @@ import com.hazelcast.map.IMap;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.api.exceptions.DataNodeNotFoundException;
@@ -51,12 +49,8 @@ public class ModuleSyncTasks {
* Perform module sync on a batch of cm handles.
*
* @param cmHandleIds a batch of cm handle ids to perform module sync on
- * @param batchCounter the number of batches currently being processed, will be decreased when
- * task is finished or fails
- * @return completed future to handle post-processing
*/
- public CompletableFuture<Void> performModuleSync(final Collection<String> cmHandleIds,
- final AtomicInteger batchCounter) {
+ public void performModuleSync(final Collection<String> cmHandleIds) {
final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(cmHandleIds.size());
try {
for (final String cmHandleId : cmHandleIds) {
@@ -74,11 +68,8 @@ public class ModuleSyncTasks {
}
}
} finally {
- batchCounter.getAndDecrement();
lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandleStatePerCmHandle);
- log.info("Processing module sync batch finished. {} batch(es) active.", batchCounter.get());
}
- return CompletableFuture.completedFuture(null);
}
/**
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
index 32e1c49f17..6eefedb633 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022-2024 Nordix Foundation
+ * Copyright (C) 2022-2025 Nordix Foundation
* Modifications Copyright (C) 2022 Bell Canada
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -27,13 +27,9 @@ import com.hazelcast.map.IMap;
import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
-import org.onap.cps.ncmp.impl.utils.Sleeper;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@@ -46,16 +42,10 @@ public class ModuleSyncWatchdog {
private final BlockingQueue<String> moduleSyncWorkQueue;
private final IMap<String, Object> moduleSyncStartedOnCmHandles;
private final ModuleSyncTasks moduleSyncTasks;
- private final AsyncTaskExecutor asyncTaskExecutor;
private final IMap<String, String> cpsAndNcmpLock;
- private final Sleeper sleeper;
private static final int MODULE_SYNC_BATCH_SIZE = 100;
- private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10;
private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started";
- private static final long ASYNC_TASK_TIMEOUT_IN_MILLISECONDS = TimeUnit.MINUTES.toMillis(5);
- @Getter
- private AtomicInteger batchCounter = new AtomicInteger(1);
/**
* Check DB for any cm handles in 'ADVISED' state.
@@ -69,18 +59,11 @@ public class ModuleSyncWatchdog {
log.debug("Processing module sync watchdog waking up.");
populateWorkQueueIfNeeded();
while (!moduleSyncWorkQueue.isEmpty()) {
- if (batchCounter.get() <= asyncTaskExecutor.getAsyncTaskParallelismLevel()) {
- final Collection<String> nextBatch = prepareNextBatch();
- log.info("Processing module sync batch of {}. {} batch(es) active.",
- nextBatch.size(), batchCounter.get());
- if (!nextBatch.isEmpty()) {
- asyncTaskExecutor.executeTask(() ->
- moduleSyncTasks.performModuleSync(nextBatch, batchCounter),
- ASYNC_TASK_TIMEOUT_IN_MILLISECONDS);
- batchCounter.getAndIncrement();
- }
- } else {
- preventBusyWait();
+ final Collection<String> nextBatch = prepareNextBatch();
+ if (!nextBatch.isEmpty()) {
+ log.info("Processing module sync batch of {}. 1 batch(es) active.", nextBatch.size());
+ moduleSyncTasks.performModuleSync(nextBatch);
+ log.info("Processing module sync batch finished. 0 batch(es) active.");
}
}
}
@@ -153,13 +136,4 @@ public class ModuleSyncWatchdog {
log.info("nextBatch size : {}", nextBatch.size());
return nextBatch;
}
-
- private void preventBusyWait() {
- try {
- log.debug("Busy waiting now");
- sleeper.haveALittleRest(PREVENT_CPU_BURN_WAIT_TIME_MILLIS);
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/trustlevel/TrustLevelManager.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/trustlevel/TrustLevelManager.java
index f68bb3b543..692bf5caee 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/trustlevel/TrustLevelManager.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/trustlevel/TrustLevelManager.java
@@ -193,14 +193,14 @@ public class TrustLevelManager {
final TrustLevel newEffectiveTrustLevel) {
if (oldEffectiveTrustLevel.equals(newEffectiveTrustLevel)) {
log.debug("The Cm Handle: {} has already the same trust level: {}", notificationCandidateCmHandleId,
- newEffectiveTrustLevel);
+ newEffectiveTrustLevel);
} else {
log.info("The trust level for Cm Handle: {} is now: {} ", notificationCandidateCmHandleId,
- newEffectiveTrustLevel);
+ newEffectiveTrustLevel);
cmAvcEventPublisher.publishAvcEvent(notificationCandidateCmHandleId,
- AVC_CHANGED_ATTRIBUTE_NAME,
- oldEffectiveTrustLevel.name(),
- newEffectiveTrustLevel.name());
+ AVC_CHANGED_ATTRIBUTE_NAME,
+ oldEffectiveTrustLevel.name(),
+ newEffectiveTrustLevel.name());
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java
deleted file mode 100644
index 7a02fa06e0..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.utils;
-
-import java.util.concurrent.TimeUnit;
-import org.springframework.stereotype.Service;
-
-/**
- * This class is to extract out sleep functionality so the interrupted exception handling can
- * be covered with a test (e.g. using spy on Sleeper) and help to get to 100% code coverage.
- */
-@Service
-public class Sleeper {
- public void haveALittleRest(final long timeInMillis) throws InterruptedException {
- TimeUnit.MILLISECONDS.sleep(timeInMillis);
- }
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/utils/events/CmAvcEventPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/utils/events/CmAvcEventPublisher.java
index 2a9717cc1a..bdc7899724 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/utils/events/CmAvcEventPublisher.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/utils/events/CmAvcEventPublisher.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2023-2024 Nordix Foundation
+ * Copyright (C) 2023-2025 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,8 +38,8 @@ public class CmAvcEventPublisher {
private final EventsPublisher<CloudEvent> eventsPublisher;
- @Value("${app.ncmp.avc.cm-events-topic}")
- private String avcTopic;
+ @Value("${app.ncmp.avc.inventory-events-topic}")
+ private String ncmpInventoryEventsTopicName;
/**
* Publish attribute value change event.
@@ -52,10 +52,10 @@ public class CmAvcEventPublisher {
final Map<String, String> extensions = createAvcEventExtensions(eventKey);
final CloudEvent avcCloudEvent =
- NcmpEvent.builder().type(AvcEvent.class.getTypeName())
- .data(avcEvent).extensions(extensions).build().asCloudEvent();
+ NcmpEvent.builder().type(AvcEvent.class.getTypeName())
+ .data(avcEvent).extensions(extensions).build().asCloudEvent();
- eventsPublisher.publishCloudEvent(avcTopic, eventKey, avcCloudEvent);
+ eventsPublisher.publishCloudEvent(ncmpInventoryEventsTopicName, eventKey, avcCloudEvent);
}
private AvcEvent buildAvcEvent(final String attributeName,
@@ -78,4 +78,4 @@ public class CmAvcEventPublisher {
extensions.put("correlationid", eventKey);
return extensions;
}
-}
+} \ No newline at end of file
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutorSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutorSpec.groovy
deleted file mode 100644
index 751c97a4d0..0000000000
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutorSpec.groovy
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2022-2023 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.inventory.sync
-
-
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.boot.test.context.SpringBootTest
-import spock.lang.Specification
-
-import java.util.concurrent.TimeoutException
-import java.util.function.Supplier
-
-@SpringBootTest(classes = AsyncTaskExecutor)
-class AsyncTaskExecutorSpec extends Specification {
-
- @Autowired
- AsyncTaskExecutor objectUnderTest
- def mockTaskSupplier = Mock(Supplier<Object>)
-
- def 'Parallelism level configuration.'() {
- expect: 'Parallelism level is configured with the correct value'
- assert objectUnderTest.getAsyncTaskParallelismLevel() == 3
- }
-
- def 'Task completion with #caseDescriptor.'() {
- when: 'task completion is handled'
- def irrelevantResponse = null
- objectUnderTest.handleTaskCompletion(irrelevantResponse, exception);
- then: 'any exception is swallowed by the task completion (logged)'
- noExceptionThrown()
- where: 'following cases are tested'
- caseDescriptor | exception
- 'no exception' | null
- 'time out exception' | new TimeoutException("time-out")
- 'unexpected exception' | new Exception("some exception")
- }
-
- def 'Task execution.'() {
- when: 'a task is submitted for execution'
- objectUnderTest.executeTask(() -> mockTaskSupplier, 0)
- then: 'the task submission is successful'
- noExceptionThrown()
- }
-
-}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy
index 92f4b38f31..98f3cc05bb 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy
@@ -39,8 +39,6 @@ import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventsCmHandleStateHandler
import org.slf4j.LoggerFactory
import spock.lang.Specification
-import java.util.concurrent.atomic.AtomicInteger
-
import static org.onap.cps.ncmp.api.inventory.models.LockReasonCategory.MODULE_SYNC_FAILED
import static org.onap.cps.ncmp.api.inventory.models.LockReasonCategory.MODULE_UPGRADE
import static org.onap.cps.ncmp.api.inventory.models.LockReasonCategory.MODULE_UPGRADE_FAILED
@@ -70,8 +68,6 @@ class ModuleSyncTasksSpec extends Specification {
.getOrCreateHazelcastInstance(new Config('hazelcastInstanceName'))
.getMap('mapInstanceName')
- def batchCount = new AtomicInteger(5)
-
def objectUnderTest = new ModuleSyncTasks(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService,
mockLcmEventsCmHandleStateHandler, moduleSyncStartedOnCmHandles)
@@ -87,7 +83,7 @@ class ModuleSyncTasksSpec extends Specification {
mockInventoryPersistence.getYangModelCmHandle('cm-handle-1') >> cmHandle1
mockInventoryPersistence.getYangModelCmHandle('cm-handle-2') >> cmHandle2
when: 'module sync poll is executed'
- objectUnderTest.performModuleSync(['cm-handle-1', 'cm-handle-2'], batchCount)
+ objectUnderTest.performModuleSync(['cm-handle-1', 'cm-handle-2'])
then: 'module sync service is invoked for each cm handle'
1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assert args[0].id == 'cm-handle-1' }
1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assert args[0].id == 'cm-handle-2' }
@@ -95,8 +91,6 @@ class ModuleSyncTasksSpec extends Specification {
1 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch(_) >> { args ->
assertBatch(args, ['cm-handle-1', 'cm-handle-2'], CmHandleState.READY)
}
- and: 'batch count is decremented by one'
- assert batchCount.get() == 4
}
def 'Handle CM handle failure during #scenario and log MODULE_UPGRADE lock reason'() {
@@ -108,15 +102,13 @@ class ModuleSyncTasksSpec extends Specification {
mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { throw new Exception('some exception') }
mockModuleSyncService.syncAndUpgradeSchemaSet(_) >> { throw new Exception('some exception') }
when: 'module sync is executed'
- objectUnderTest.performModuleSync(['cm-handle'], batchCount)
+ objectUnderTest.performModuleSync(['cm-handle'])
then: 'lock reason is updated with number of attempts'
1 * mockSyncUtils.updateLockReasonWithAttempts(_, expectedLockReasonCategory, 'some exception')
and: 'the state handler is called to update the state to LOCKED'
1 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch(_) >> { args ->
assertBatch(args, ['cm-handle'], CmHandleState.LOCKED)
}
- and: 'batch count is decremented by one'
- assert batchCount.get() == 4
where:
scenario | lockReasonCategory | lockReasonDetails || expectedLockReasonCategory
'module sync' | MODULE_SYNC_FAILED | 'some lock details' || MODULE_SYNC_FAILED
@@ -132,7 +124,7 @@ class ModuleSyncTasksSpec extends Specification {
and: 'a cm handle in advised state'
mockInventoryPersistence.getYangModelCmHandle('cm-handle-3') >> cmHandleByIdAndState('cm-handle-3', CmHandleState.ADVISED)
when: 'module sync poll is executed'
- objectUnderTest.performModuleSync(['cm-handle-1', 'cm-handle-2', 'cm-handle-3'], batchCount)
+ objectUnderTest.performModuleSync(['cm-handle-1', 'cm-handle-2', 'cm-handle-3'])
then: 'no exception is thrown'
noExceptionThrown()
and: 'the deleted cm-handle did not sync'
@@ -176,7 +168,7 @@ class ModuleSyncTasksSpec extends Specification {
and: 'entry in progress map for other cm handle'
moduleSyncStartedOnCmHandles.put('other-cm-handle', 'started')
when: 'module sync poll is executed'
- objectUnderTest.performModuleSync(['cm-handle-1'], batchCount)
+ objectUnderTest.performModuleSync(['cm-handle-1'])
then: 'module sync service is invoked for cm handle'
1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assert args[0].id == 'cm-handle-1' }
and: 'the entry for other cm handle is still in the progress map'
@@ -201,7 +193,7 @@ class ModuleSyncTasksSpec extends Specification {
cmHandle.compositeState.setLockReason(CompositeState.LockReason.builder().lockReasonCategory(lockReasonCategory).build())
mockInventoryPersistence.getYangModelCmHandle('cm-handle') >> cmHandle
when: 'module sync is executed'
- objectUnderTest.performModuleSync(['cm-handle'], batchCount)
+ objectUnderTest.performModuleSync(['cm-handle'])
then: 'the module sync service should attempt to sync and upgrade the CM handle'
1 * mockModuleSyncService.syncAndUpgradeSchemaSet(_) >> { args ->
assert args[0].id == 'cm-handle'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
index a9b88c2d3b..68aa6a1b6a 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022-2024 Nordix Foundation
+ * Copyright (C) 2022-2025 Nordix Foundation
* Modifications Copyright (C) 2022 Bell Canada
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -22,14 +22,10 @@
package org.onap.cps.ncmp.impl.inventory.sync
import com.hazelcast.map.IMap
+import java.util.concurrent.ArrayBlockingQueue
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
-import org.onap.cps.ncmp.impl.utils.Sleeper
-import org.onap.cps.api.model.DataNode
import spock.lang.Specification
-import java.util.concurrent.ArrayBlockingQueue
-import java.util.concurrent.locks.Lock
-
class ModuleSyncWatchdogSpec extends Specification {
def mockModuleOperationsUtils = Mock(ModuleOperationsUtils)
@@ -42,17 +38,9 @@ class ModuleSyncWatchdogSpec extends Specification {
def mockModuleSyncTasks = Mock(ModuleSyncTasks)
- def spiedAsyncTaskExecutor = Spy(AsyncTaskExecutor)
-
def mockCpsAndNcmpLock = Mock(IMap<String,String>)
- def spiedSleeper = Spy(Sleeper)
-
- def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor, mockCpsAndNcmpLock, spiedSleeper)
-
- void setup() {
- spiedAsyncTaskExecutor.setupThreadPool()
- }
+ def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, mockCpsAndNcmpLock)
def 'Module sync advised cm handles with #scenario.'() {
given: 'module sync utilities returns #numberOfAdvisedCmHandles advised cm handles'
@@ -61,12 +49,10 @@ class ModuleSyncWatchdogSpec extends Specification {
mockModuleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> []
and: 'the work queue can be locked'
mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
- and: 'the executor has enough available threads'
- spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 3
when: ' module sync is started'
objectUnderTest.moduleSyncAdvisedCmHandles()
then: 'it performs #expectedNumberOfTaskExecutions tasks'
- expectedNumberOfTaskExecutions * spiedAsyncTaskExecutor.executeTask(*_)
+ expectedNumberOfTaskExecutions * mockModuleSyncTasks.performModuleSync(*_)
and: 'the executing thread is unlocked'
1 * mockCpsAndNcmpLock.unlock('workQueueLock')
where: 'the following parameter are used'
@@ -84,12 +70,10 @@ class ModuleSyncWatchdogSpec extends Specification {
mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
and: 'the work queue can be locked'
mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
- and: 'the executor first has no threads but has one thread on the second attempt'
- spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >>> [ 0, 1 ]
when: ' module sync is started'
objectUnderTest.moduleSyncAdvisedCmHandles()
then: 'it performs one task'
- 1 * spiedAsyncTaskExecutor.executeTask(*_)
+ 1 * mockModuleSyncTasks.performModuleSync(*_)
}
def 'Module sync advised cm handle already handled by other thread.'() {
@@ -97,27 +81,21 @@ class ModuleSyncWatchdogSpec extends Specification {
mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
and: 'the work queue can be locked'
mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
- and: 'the executor has a thread available'
- spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 1
and: 'the semaphore cache indicates the cm handle is already being processed'
mockModuleSyncStartedOnCmHandles.putIfAbsent(*_) >> 'Started'
- when: ' module sync is started'
+ when: 'module sync is started'
objectUnderTest.moduleSyncAdvisedCmHandles()
then: 'it does NOT execute a task to process the (empty) batch'
- 0 * spiedAsyncTaskExecutor.executeTask(*_)
+ 0 * mockModuleSyncTasks.performModuleSync(*_)
}
def 'Module sync with previous cm handle(s) left in work queue.'() {
given: 'there is still a cm handle in the queue'
moduleSyncWorkQueue.offer('ch-1')
- and: 'sync utilities returns many advise cm handles'
- mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(500)
- and: 'the executor has plenty threads available'
- spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 10
- when: ' module sync is started'
+ when: 'module sync is started'
objectUnderTest.moduleSyncAdvisedCmHandles()
then: 'it does executes only one task to process the remaining handle in the queue'
- 1 * spiedAsyncTaskExecutor.executeTask(*_)
+ 1 * mockModuleSyncTasks.performModuleSync(*_)
}
def 'Reset failed cm handles.'() {
@@ -147,15 +125,6 @@ class ModuleSyncWatchdogSpec extends Specification {
true || false || 1
}
- def 'Sleeper gets interrupted.'() {
- given: 'sleeper gets interrupted'
- spiedSleeper.haveALittleRest(_) >> { throw new InterruptedException() }
- when: 'the watchdog attempts to sleep to save cpu cycles'
- objectUnderTest.preventBusyWait()
- then: 'no exception is thrown'
- noExceptionThrown()
- }
-
def createCmHandleIds(numberOfCmHandles) {
return (numberOfCmHandles > 0) ? (1..numberOfCmHandles).collect { 'ch-'+it } : []
}
diff --git a/cps-ncmp-service/src/test/resources/application.yml b/cps-ncmp-service/src/test/resources/application.yml
index 12db639633..3276ceb534 100644
--- a/cps-ncmp-service/src/test/resources/application.yml
+++ b/cps-ncmp-service/src/test/resources/application.yml
@@ -77,10 +77,6 @@ ncmp:
trust-level:
dmi-availability-watchdog-ms: 30000
- modules-sync-watchdog:
- async-executor:
- parallelism-level: 3
-
policy-executor:
enabled: true
defaultDecision: "some default decision"
diff --git a/csit/tests/cps-trust-level/cps-trust-level.robot b/csit/tests/cps-trust-level/cps-trust-level.robot
index 810bcf4d12..98ec665a6c 100644
--- a/csit/tests/cps-trust-level/cps-trust-level.robot
+++ b/csit/tests/cps-trust-level/cps-trust-level.robot
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2023-2024 Nordix Foundation
+ * Copyright (C) 2023-2025 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -46,11 +46,11 @@ Register data node
Should Be Equal As Strings ${response.status_code} 200
Verify notification
- ${group_id}= Create Consumer auto_offset_reset=earliest
- Subscribe Topic topics=cm-events group_id=${group_id}
- ${result}= Poll group_id=${group_id} only_value=False poll_attempts=5
- ${headers} Set Variable ${result[0].headers()}
- ${payload} Set Variable ${result[0].value()}
+ ${group_id}= Create Consumer auto_offset_reset=earliest
+ Subscribe Topic topics=ncmp-inventory-events group_id=${group_id}
+ ${result}= Poll group_id=${group_id} only_value=False poll_attempts=5
+ ${headers} Set Variable ${result[0].headers()}
+ ${payload} Set Variable ${result[0].value()}
FOR ${header_key_value_pair} IN @{headers}
Compare Header Values ${header_key_value_pair[0]} ${header_key_value_pair[1]} "ce_specversion" "1.0"
Compare Header Values ${header_key_value_pair[0]} ${header_key_value_pair[1]} "ce_source" "NCMP"
diff --git a/docs/deployment.rst b/docs/deployment.rst
index 2af0dd0cd5..840ab8e116 100644
--- a/docs/deployment.rst
+++ b/docs/deployment.rst
@@ -22,14 +22,24 @@ set appropriately. For example, given a database with 2GB of memory, 512MB is a
CPS and NCMP Configuration
==========================
+CPU and Memory Requirements
+---------------------------
+
+The following are minimum requirements for NCMP:
+
+* For 20,000 CM-handles: 2 CPUs and 2 GB RAM per instance, with 70% heap allocation.
+* For 50,000 CM-handles: 3 CPUs and 3 GB RAM per instance, with 70% heap allocation.
+
JVM Memory Allocation
+^^^^^^^^^^^^^^^^^^^^^
-Allocating 75% of the container's memory to the JVM heap ensures efficient memory management.
-This helps the JVM make the best use of the allocated resources while leaving enough memory for other processes.
+When running with 2 GB or more memory per instance, allocating 70% of the JVM memory to the heap ensures efficient
+memory management. It is not recommended to go above 70%.
.. code-block:: yaml
- JAVA_TOOL_OPTIONS: "-XX:InitialRAMPercentage=75.0 -XX:MaxRAMPercentage=75.0"
+ JAVA_TOOL_OPTIONS: "-XX:InitialRAMPercentage=70.0 -XX:MaxRAMPercentage=70.0"
+
Load balancer configuration
===========================
diff --git a/integration-test/src/test/resources/application.yml b/integration-test/src/test/resources/application.yml
index 30598dfb90..e213a70a59 100644
--- a/integration-test/src/test/resources/application.yml
+++ b/integration-test/src/test/resources/application.yml
@@ -102,6 +102,7 @@ app:
cm-subscription-dmi-out: ${CM_SUBSCRIPTION_DMI_OUT_TOPIC:dmi-ncmp-cm-avc-subscription}
cm-subscription-ncmp-out: ${CM_SUBSCRIPTION_NCMP_OUT_TOPIC:subscription-response}
cm-events-topic: ${NCMP_CM_EVENTS_TOPIC:cm-events}
+ inventory-events-topic: ncmp-inventory-events
lcm:
events:
topic: ${LCM_EVENTS_TOPIC:ncmp-events}
@@ -189,10 +190,6 @@ ncmp:
trust-level:
dmi-availability-watchdog-ms: 30000
- modules-sync-watchdog:
- async-executor:
- parallelism-level: 2
-
model-loader:
maximum-attempt-count: 20