aboutsummaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service')
-rw-r--r--cps-ncmp-service/pom.xml6
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/NetworkCmProxyInventoryFacade.java7
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java4
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationService.java3
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java21
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java84
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java19
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java35
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/http/WebClientConfiguration.java2
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfigSpec.groovy4
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServiceSpec.groovy28
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/NetworkCmProxyInventoryFacadeSpec.groovy4
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy2
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy71
14 files changed, 202 insertions, 88 deletions
diff --git a/cps-ncmp-service/pom.xml b/cps-ncmp-service/pom.xml
index 9ce9d51363..fda4221f6f 100644
--- a/cps-ncmp-service/pom.xml
+++ b/cps-ncmp-service/pom.xml
@@ -27,7 +27,7 @@
<parent>
<groupId>org.onap.cps</groupId>
<artifactId>cps-parent</artifactId>
- <version>3.5.4-SNAPSHOT</version>
+ <version>3.5.5-SNAPSHOT</version>
<relativePath>../cps-parent/pom.xml</relativePath>
</parent>
@@ -74,10 +74,6 @@
<artifactId>cps-path-parser</artifactId>
</dependency>
<dependency>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>annotations</artifactId>
- </dependency>
- <dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast-spring</artifactId>
</dependency>
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/NetworkCmProxyInventoryFacade.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/NetworkCmProxyInventoryFacade.java
index 5331c13bd1..e9e5f5499f 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/NetworkCmProxyInventoryFacade.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/NetworkCmProxyInventoryFacade.java
@@ -62,15 +62,16 @@ public class NetworkCmProxyInventoryFacade {
private final TrustLevelManager trustLevelManager;
private final AlternateIdMatcher alternateIdMatcher;
+
+
/**
* Registration of Created, Removed, Updated or Upgraded CM Handles.
*
* @param dmiPluginRegistration Dmi Plugin Registration details
* @return dmiPluginRegistrationResponse
*/
- public DmiPluginRegistrationResponse updateDmiRegistrationAndSyncModule(
- final DmiPluginRegistration dmiPluginRegistration) {
- return cmHandleRegistrationService.updateDmiRegistrationAndSyncModule(dmiPluginRegistration);
+ public DmiPluginRegistrationResponse updateDmiRegistration(final DmiPluginRegistration dmiPluginRegistration) {
+ return cmHandleRegistrationService.updateDmiRegistration(dmiPluginRegistration);
}
/**
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java
index ad8025b5dc..109a541cb3 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java
@@ -57,13 +57,13 @@ public class HazelcastCacheConfig {
final Config config = getHazelcastInstanceConfig(instanceConfigName);
config.setClusterName(clusterName);
config.setClassLoader(org.onap.cps.spi.model.Dataspace.class.getClassLoader());
- dataStructuresConfig(namedConfig, config);
+ configureDataStructures(namedConfig, config);
exposeClusterInformation(config);
updateDiscoveryMode(config);
return config;
}
- private static void dataStructuresConfig(final NamedConfig namedConfig, final Config config) {
+ private static void configureDataStructures(final NamedConfig namedConfig, final Config config) {
if (namedConfig instanceof MapConfig) {
config.addMapConfig((MapConfig) namedConfig);
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationService.java
index d9f7e38993..cb55b09d41 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationService.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationService.java
@@ -87,8 +87,7 @@ public class CmHandleRegistrationService {
* @param dmiPluginRegistration Dmi Plugin Registration details
* @return dmiPluginRegistrationResponse
*/
- public DmiPluginRegistrationResponse updateDmiRegistrationAndSyncModule(
- final DmiPluginRegistration dmiPluginRegistration) {
+ public DmiPluginRegistrationResponse updateDmiRegistration(final DmiPluginRegistration dmiPluginRegistration) {
dmiPluginRegistration.validateDmiPluginRegistration();
final DmiPluginRegistrationResponse dmiPluginRegistrationResponse = new DmiPluginRegistrationResponse();
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
index c6deb79d4d..e627f8f894 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
@@ -95,22 +95,21 @@ public class ModuleSyncTasks {
}
/**
- * Resets the state of failed CM handles and updates their status to ADVISED for retry.
-
- * This method processes a collection of failed CM handles, logs their lock reason, and resets their state
+ * Set the state of CM handles to ADVISED.
+ * This method processes a collection of CM handles, logs their lock reason, and resets their state
* to ADVISED. Once reset, it updates the CM handle states in a batch to allow for re-attempt by the module-sync
* watchdog.
*
- * @param failedCmHandles a collection of CM handles that have failed and need their state reset
+ * @param yangModelCmHandles a collection of CM handles that needs their state reset
*/
- public void resetFailedCmHandles(final Collection<YangModelCmHandle> failedCmHandles) {
- final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(failedCmHandles.size());
- for (final YangModelCmHandle failedCmHandle : failedCmHandles) {
- final CompositeState compositeState = failedCmHandle.getCompositeState();
- final String resetCmHandleId = failedCmHandle.getId();
+ public void setCmHandlesToAdvised(final Collection<YangModelCmHandle> yangModelCmHandles) {
+ final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(yangModelCmHandles.size());
+ for (final YangModelCmHandle yangModelCmHandle : yangModelCmHandles) {
+ final CompositeState compositeState = yangModelCmHandle.getCompositeState();
+ final String resetCmHandleId = yangModelCmHandle.getId();
log.debug("Resetting CM handle {} state to ADVISED for retry by the module-sync watchdog. Lock reason: {}",
- failedCmHandle.getId(), compositeState.getLockReason().getLockReasonCategory().name());
- cmHandleStatePerCmHandle.put(failedCmHandle, CmHandleState.ADVISED);
+ yangModelCmHandle.getId(), compositeState.getLockReason().getLockReasonCategory().name());
+ cmHandleStatePerCmHandle.put(yangModelCmHandle, CmHandleState.ADVISED);
removeResetCmHandleFromModuleSyncMap(resetCmHandleId);
}
lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandleStatePerCmHandle);
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
index bc7d6cdf67..898b8d5bf4 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
@@ -27,10 +27,12 @@ import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
+import org.onap.cps.ncmp.impl.utils.Sleeper;
import org.onap.cps.spi.model.DataNode;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@@ -45,6 +47,9 @@ public class ModuleSyncWatchdog {
private final IMap<String, Object> moduleSyncStartedOnCmHandles;
private final ModuleSyncTasks moduleSyncTasks;
private final AsyncTaskExecutor asyncTaskExecutor;
+ private final Lock workQueueLock;
+ private final Sleeper sleeper;
+
private static final int MODULE_SYNC_BATCH_SIZE = 100;
private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10;
private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started";
@@ -56,11 +61,12 @@ public class ModuleSyncWatchdog {
* Check DB for any cm handles in 'ADVISED' state.
* Queue and create batches to process them asynchronously.
* This method will only finish when there are no more 'ADVISED' cm handles in the DB.
- * This method wil be triggered on a configurable interval
+ * This method is triggered on a configurable interval (ncmp.timers.advised-modules-sync.sleep-time-ms)
*/
- @Scheduled(fixedDelayString = "${ncmp.timers.advised-modules-sync.sleep-time-ms:5000}")
+ @Scheduled(initialDelayString = "${test.ncmp.timers.advised-modules-sync.initial-delay-ms:0}",
+ fixedDelayString = "${ncmp.timers.advised-modules-sync.sleep-time-ms:5000}")
public void moduleSyncAdvisedCmHandles() {
- log.info("Processing module sync watchdog waking up.");
+ log.debug("Processing module sync watchdog waking up.");
populateWorkQueueIfNeeded();
while (!moduleSyncWorkQueue.isEmpty()) {
if (batchCounter.get() <= asyncTaskExecutor.getAsyncTaskParallelismLevel()) {
@@ -80,36 +86,50 @@ public class ModuleSyncWatchdog {
}
/**
- * Find any failed (locked) cm handles and change state back to 'ADVISED'.
+ * Populate work queue with advised cm handles from db.
+ * This method is made public for (integration) testing purposes.
+ * So it can be tested without the queue being emptied immediately as the main public method does.
*/
- @Scheduled(fixedDelayString = "${ncmp.timers.locked-modules-sync.sleep-time-ms:15000}")
- public void resetPreviouslyFailedCmHandles() {
- log.info("Processing module sync retry-watchdog waking up.");
- final Collection<YangModelCmHandle> failedCmHandles
- = moduleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade();
- log.info("Retrying {} cmHandles", failedCmHandles.size());
- moduleSyncTasks.resetFailedCmHandles(failedCmHandles);
+ public void populateWorkQueueIfNeeded() {
+ if (moduleSyncWorkQueue.isEmpty() && workQueueLock.tryLock()) {
+ try {
+ populateWorkQueue();
+ if (moduleSyncWorkQueue.isEmpty()) {
+ setPreviouslyLockedCmHandlesToAdvised();
+ }
+ } finally {
+ workQueueLock.unlock();
+ }
+ }
}
- private void preventBusyWait() {
- try {
- log.info("Busy waiting now");
- TimeUnit.MILLISECONDS.sleep(PREVENT_CPU_BURN_WAIT_TIME_MILLIS);
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
+ private void populateWorkQueue() {
+ final Collection<DataNode> advisedCmHandles = moduleOperationsUtils.getAdvisedCmHandles();
+ if (advisedCmHandles.isEmpty()) {
+ log.debug("No advised CM handles found in DB.");
+ } else {
+ log.info("Fetched {} advised CM handles from DB. Adding them to the work queue.", advisedCmHandles.size());
+ advisedCmHandles.forEach(advisedCmHandle -> {
+ final String cmHandleId = String.valueOf(advisedCmHandle.getLeaves().get("id"));
+ if (moduleSyncWorkQueue.offer(advisedCmHandle)) {
+ log.info("CM handle {} added to the work queue.", cmHandleId);
+ } else {
+ log.warn("Failed to add CM handle {} to the work queue.", cmHandleId);
+ }
+ });
+ log.info("Work queue contains {} items.", moduleSyncWorkQueue.size());
}
}
- private void populateWorkQueueIfNeeded() {
- if (moduleSyncWorkQueue.isEmpty()) {
- final Collection<DataNode> advisedCmHandles = moduleOperationsUtils.getAdvisedCmHandles();
- log.info("Processing module sync fetched {} advised cm handles from DB", advisedCmHandles.size());
- for (final DataNode advisedCmHandle : advisedCmHandles) {
- if (!moduleSyncWorkQueue.offer(advisedCmHandle)) {
- log.warn("Unable to add cm handle {} to the work queue", advisedCmHandle.getLeaves().get("id"));
- }
- }
- log.info("Work Queue Size : {}", moduleSyncWorkQueue.size());
+ private void setPreviouslyLockedCmHandlesToAdvised() {
+ final Collection<YangModelCmHandle> lockedCmHandles
+ = moduleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade();
+ if (lockedCmHandles.isEmpty()) {
+ log.debug("No locked CM handles found in DB.");
+ } else {
+ log.info("Found {} Locked CM Handles. Changing state to Advise to retry syncing them again.",
+ lockedCmHandles.size());
+ moduleSyncTasks.setCmHandlesToAdvised(lockedCmHandles);
}
}
@@ -130,8 +150,16 @@ public class ModuleSyncWatchdog {
nextBatch.add(batchCandidate);
}
}
- log.debug("nextBatch size : {}", nextBatch.size());
+ log.info("nextBatch size : {}", nextBatch.size());
return nextBatch;
}
+ private void preventBusyWait() {
+ try {
+ log.debug("Busy waiting now");
+ sleeper.haveALittleRest(PREVENT_CPU_BURN_WAIT_TIME_MILLIS);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java
index c5fae0d166..1f33cc349d 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java
@@ -24,6 +24,7 @@ import com.hazelcast.config.MapConfig;
import com.hazelcast.config.QueueConfig;
import com.hazelcast.map.IMap;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.locks.Lock;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.impl.cache.HazelcastCacheConfig;
import org.onap.cps.spi.model.DataNode;
@@ -43,6 +44,7 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig {
private static final QueueConfig commonQueueConfig = createQueueConfig("defaultQueueConfig");
private static final MapConfig moduleSyncStartedConfig = createMapConfig("moduleSyncStartedConfig");
private static final MapConfig dataSyncSemaphoresConfig = createMapConfig("dataSyncSemaphoresConfig");
+ private static final String LOCK_NAME_FOR_WORK_QUEUE = "workQueueLock";
/**
* Module Sync Distributed Queue Instance.
@@ -74,4 +76,21 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig {
public IMap<String, Boolean> dataSyncSemaphores() {
return getOrCreateHazelcastInstance(dataSyncSemaphoresConfig).getMap("dataSyncSemaphores");
}
+
+ /**
+ * Retrieves a distributed lock used to control access to the work queue for module synchronization.
+ * This lock ensures that the population and modification of the work queue are thread-safe and
+ * protected from concurrent access across different nodes in the distributed system.
+ * The lock guarantees that only one instance of the application can populate or modify the
+ * module sync work queue at a time, preventing race conditions and potential data inconsistencies.
+ * The lock is obtained using the Hazelcast CP Subsystem's {@link Lock}, which provides
+ * strong consistency guarantees for distributed operations.
+ *
+ * @return a {@link Lock} instance used for synchronizing access to the work queue.
+ */
+ @Bean
+ public Lock workQueueLock() {
+ // TODO Method below does not use commonQueueConfig for creating lock (Refactor later)
+ return getOrCreateHazelcastInstance(commonQueueConfig).getCPSubsystem().getLock(LOCK_NAME_FOR_WORK_QUEUE);
+ }
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java
new file mode 100644
index 0000000000..7a02fa06e0
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.utils;
+
+import java.util.concurrent.TimeUnit;
+import org.springframework.stereotype.Service;
+
+/**
+ * This class is to extract out sleep functionality so the interrupted exception handling can
+ * be covered with a test (e.g. using spy on Sleeper) and help to get to 100% code coverage.
+ */
+@Service
+public class Sleeper {
+ public void haveALittleRest(final long timeInMillis) throws InterruptedException {
+ TimeUnit.MILLISECONDS.sleep(timeInMillis);
+ }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/http/WebClientConfiguration.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/http/WebClientConfiguration.java
index d8e8350345..8ae942eb7b 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/http/WebClientConfiguration.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/http/WebClientConfiguration.java
@@ -20,7 +20,6 @@
package org.onap.cps.ncmp.impl.utils.http;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
@@ -63,7 +62,6 @@ public class WebClientConfiguration {
.compress(true);
}
- @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
private static ConnectionProvider getConnectionProvider(final ServiceConfig serviceConfig) {
return ConnectionProvider.builder(serviceConfig.getConnectionProviderName())
.maxConnections(serviceConfig.getMaximumConnectionsTotal())
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfigSpec.groovy
index dc38e0fc9b..0bd838437d 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfigSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfigSpec.groovy
@@ -29,10 +29,10 @@ class HazelcastCacheConfigSpec extends Specification {
def objectUnderTest = new HazelcastCacheConfig()
def 'Create Hazelcast instance with a #scenario'() {
- given: 'a cluster name and instance name'
+ given: 'a cluster name and instance config name'
objectUnderTest.clusterName = 'my cluster'
objectUnderTest.instanceConfigName = 'my instance config'
- when: 'an hazelcast instance is created (name has to be unique)'
+ when: 'a hazelcast instance is created (name has to be unique)'
def result = objectUnderTest.getOrCreateHazelcastInstance(config)
then: 'the instance is created and has the correct name'
assert result.name == 'my instance config'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServiceSpec.groovy
index dcff2e9b89..70e26d993c 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServiceSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServiceSpec.groovy
@@ -87,7 +87,7 @@ class CmHandleRegistrationServiceSpec extends Specification {
and: 'cm handle is in READY state'
mockCmHandleQueries.cmHandleHasState('cmhandle-3', CmHandleState.READY) >> true
when: 'registration is processed'
- objectUnderTest.updateDmiRegistrationAndSyncModule(dmiRegistration)
+ objectUnderTest.updateDmiRegistration(dmiRegistration)
then: 'cm-handles are removed first'
1 * objectUnderTest.processRemovedCmHandles(*_)
and: 'de-registered cm handle entry is removed from in progress map'
@@ -108,7 +108,7 @@ class CmHandleRegistrationServiceSpec extends Specification {
and: 'exception while checking cm handle state'
mockInventoryPersistence.getYangModelCmHandle('cmhandle-3') >> new YangModelCmHandle(id: 'cmhandle-3', moduleSetTag: '', compositeState: new CompositeState(cmHandleState: cmHandleState))
when: 'registration is processed'
- def result = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiRegistration)
+ def result = objectUnderTest.updateDmiRegistration(dmiRegistration)
then: 'upgrade operation contains expected error code'
assert result.upgradedCmHandles[0].status == expectedResponseStatus
where: 'the following parameters are used'
@@ -124,7 +124,7 @@ class CmHandleRegistrationServiceSpec extends Specification {
and: 'exception while checking cm handle state'
mockInventoryPersistence.getYangModelCmHandle('cmhandle-3') >> { throw exception }
when: 'registration is processed'
- def result = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiRegistration)
+ def result = objectUnderTest.updateDmiRegistration(dmiRegistration)
then: 'upgrade operation contains expected error code'
assert result.upgradedCmHandles.ncmpResponseStatus.code[0] == expectedErrorCode
where: 'the following parameters are used'
@@ -139,7 +139,7 @@ class CmHandleRegistrationServiceSpec extends Specification {
dmiDataPlugin: dmiDataPlugin)
dmiPluginRegistration.createdCmHandles = [ncmpServiceCmHandle]
when: 'update registration and sync module is called with correct DMI plugin information'
- objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'create cm handles registration and sync modules is called with the correct plugin information'
1 * objectUnderTest.processCreatedCmHandles(dmiPluginRegistration, _)
where:
@@ -155,7 +155,7 @@ class CmHandleRegistrationServiceSpec extends Specification {
dmiDataPlugin: dmiDataPlugin)
dmiPluginRegistration.createdCmHandles = [ncmpServiceCmHandle]
when: 'registration is called with incorrect DMI plugin information'
- objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'a DMI Request Exception is thrown with correct message details'
def exceptionThrown = thrown(DmiRequestException.class)
assert exceptionThrown.getMessage().contains(expectedMessageDetails)
@@ -178,7 +178,7 @@ class CmHandleRegistrationServiceSpec extends Specification {
def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: 'my-server')
dmiPluginRegistration.createdCmHandles = [new NcmpServiceCmHandle(cmHandleId: 'cmhandle', dmiProperties: dmiProperties, publicProperties: publicProperties)]
when: 'registration is updated'
- def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'a successful response is received'
response.createdCmHandles.size() == 1
with(response.createdCmHandles[0]) {
@@ -206,7 +206,7 @@ class CmHandleRegistrationServiceSpec extends Specification {
def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: 'my-server',
createdCmHandles:[new NcmpServiceCmHandle(cmHandleId: 'ch-1', registrationTrustLevel: registrationTrustLevel)])
when: 'registration is updated'
- objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'trustLevel is set for the created cm-handle'
1 * mockTrustLevelManager.registerCmHandles(expectedMapping)
where:
@@ -225,7 +225,7 @@ class CmHandleRegistrationServiceSpec extends Specification {
def xpath = "somePathWithId[@id='cmhandle2']"
mockLcmEventsCmHandleStateHandler.initiateStateAdvised(*_) >> { throw AlreadyDefinedException.forDataNodes([xpath], 'some-context') }
when: 'registration is updated to create cm-handles'
- def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'a response is received for all cm-handles'
response.createdCmHandles.size() == 1
and: 'all cm-handles creation fails'
@@ -244,7 +244,7 @@ class CmHandleRegistrationServiceSpec extends Specification {
and: 'cm-handler registration fails: #scenario'
mockLcmEventsCmHandleStateHandler.initiateStateAdvised(*_) >> { throw exception }
when: 'registration is updated'
- def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'a failure response is received'
response.createdCmHandles.size() == 1
with(response.createdCmHandles[0]) {
@@ -269,7 +269,7 @@ class CmHandleRegistrationServiceSpec extends Specification {
CmHandleRegistrationResponse.createFailureResponse('cm handle 4', CM_HANDLE_INVALID_ID)]
mockNetworkCmProxyDataServicePropertyHandler.updateCmHandleProperties(_) >> updateOperationResponse
when: 'registration is updated'
- def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'the response contains updateOperationResponse'
assert response.updatedCmHandles.size() == 4
assert response.updatedCmHandles.containsAll(updateOperationResponse)
@@ -281,7 +281,7 @@ class CmHandleRegistrationServiceSpec extends Specification {
and: '#scenario'
mockCpsModuleService.deleteSchemaSetsWithCascade(_, ['cmhandle']) >> { if (!schemaSetExist) { throw new SchemaSetNotFoundException('', '') } }
when: 'registration is updated to delete cmhandle'
- def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'the cmHandle state is updated to "DELETING"'
1 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch(_) >>
{ args -> args[0].values()[0] == CmHandleState.DELETING }
@@ -315,7 +315,7 @@ class CmHandleRegistrationServiceSpec extends Specification {
and: 'cm-handle deletion is successful for 1st and 3rd; failed for 2nd'
mockInventoryPersistence.deleteDataNode("/dmi-registry/cm-handles[@id='cmhandle2']") >> { throw new RuntimeException("Failed") }
when: 'registration is updated to delete cmhandles'
- def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'the cmHandle states are all updated to "DELETING"'
1 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch({ assert it.every { entry -> entry.value == CmHandleState.DELETING } })
and: 'a response is received for all cm-handles'
@@ -361,7 +361,7 @@ class CmHandleRegistrationServiceSpec extends Specification {
and: 'schema set single deletion failed with unknown error'
mockInventoryPersistence.deleteSchemaSetWithCascade(_) >> { throw new RuntimeException('Failed') }
when: 'registration is updated to delete cmhandle'
- def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'no exception is thrown'
noExceptionThrown()
and: 'cm-handle is not deleted'
@@ -387,7 +387,7 @@ class CmHandleRegistrationServiceSpec extends Specification {
and: 'cm-handle deletion fails on individual delete'
mockInventoryPersistence.deleteDataNode(_) >> { throw deleteListElementException }
when: 'registration is updated to delete cmhandle'
- def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'a failure response is received'
assert response.removedCmHandles.size() == 1
with(response.removedCmHandles[0]) {
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/NetworkCmProxyInventoryFacadeSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/NetworkCmProxyInventoryFacadeSpec.groovy
index 42f0a08ac3..4c554c6af5 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/NetworkCmProxyInventoryFacadeSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/NetworkCmProxyInventoryFacadeSpec.groovy
@@ -56,9 +56,9 @@ class NetworkCmProxyInventoryFacadeSpec extends Specification {
given: 'an (updated) dmi plugin registration'
def dmiPluginRegistration = Mock(DmiPluginRegistration)
when: 'the registration is submitted '
- objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'the call is delegated to the cm handle registration service'
- 1 * mockCmHandleRegistrationService.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ 1 * mockCmHandleRegistrationService.updateDmiRegistration(dmiPluginRegistration)
}
def 'Execute cm handle reference search for inventory'() {
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy
index 160744a7d7..4d715d28c9 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy
@@ -136,7 +136,7 @@ class ModuleSyncTasksSpec extends Specification {
moduleSyncStartedOnCmHandles.put('cm-handle-1', 'started')
moduleSyncStartedOnCmHandles.put('cm-handle-2', 'started')
when: 'resetting failed cm handles'
- objectUnderTest.resetFailedCmHandles([yangModelCmHandle1, yangModelCmHandle2])
+ objectUnderTest.setCmHandlesToAdvised([yangModelCmHandle1, yangModelCmHandle2])
then: 'updated to state "ADVISED" from "READY" is called as often as there are cm handles ready for retry'
1 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch(expectedCmHandleStatePerCmHandle)
and: 'after reset performed progress map is empty'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
index 155edc8bc6..f2c88a511e 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
@@ -23,14 +23,16 @@ package org.onap.cps.ncmp.impl.inventory.sync
import com.hazelcast.map.IMap
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
+import org.onap.cps.ncmp.impl.utils.Sleeper
import org.onap.cps.spi.model.DataNode
import spock.lang.Specification
import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.locks.Lock
class ModuleSyncWatchdogSpec extends Specification {
- def mockSyncUtils = Mock(ModuleOperationsUtils)
+ def mockModuleOperationsUtils = Mock(ModuleOperationsUtils)
def static testQueueCapacity = 50 + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE
@@ -42,15 +44,23 @@ class ModuleSyncWatchdogSpec extends Specification {
def spiedAsyncTaskExecutor = Spy(AsyncTaskExecutor)
- def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor)
+ def mockWorkQueueLock = Mock(Lock)
+
+ def spiedSleeper = Spy(Sleeper)
+
+ def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor, mockWorkQueueLock, spiedSleeper)
void setup() {
spiedAsyncTaskExecutor.setupThreadPool()
}
def 'Module sync advised cm handles with #scenario.'() {
- given: 'sync utilities returns #numberOfAdvisedCmHandles advised cm handles'
- mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(numberOfAdvisedCmHandles)
+ given: 'module sync utilities returns #numberOfAdvisedCmHandles advised cm handles'
+ mockModuleOperationsUtils.getAdvisedCmHandles() >> createDataNodes(numberOfAdvisedCmHandles)
+ and: 'module sync utilities returns no failed (locked) cm handles'
+ mockModuleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> []
+ and: 'the work queue is not locked'
+ mockWorkQueueLock.tryLock() >> true
and: 'the executor has enough available threads'
spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 3
when: ' module sync is started'
@@ -59,6 +69,7 @@ class ModuleSyncWatchdogSpec extends Specification {
expectedNumberOfTaskExecutions * spiedAsyncTaskExecutor.executeTask(*_)
where: 'the following parameter are used'
scenario | numberOfAdvisedCmHandles || expectedNumberOfTaskExecutions
+ 'none at all' | 0 || 0
'less then 1 batch' | 1 || 1
'exactly 1 batch' | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 1
'2 batches' | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 2
@@ -66,9 +77,11 @@ class ModuleSyncWatchdogSpec extends Specification {
'over queue capacity' | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3
}
- def 'Module sync advised cm handles starts with no available threads.'() {
- given: 'sync utilities returns a advise cm handles'
- mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(1)
+ def 'Module sync cm handles starts with no available threads.'() {
+ given: 'module sync utilities returns a advise cm handles'
+ mockModuleOperationsUtils.getAdvisedCmHandles() >> createDataNodes(1)
+ and: 'the work queue is not locked'
+ mockWorkQueueLock.tryLock() >> true
and: 'the executor first has no threads but has one thread on the second attempt'
spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >>> [ 0, 1 ]
when: ' module sync is started'
@@ -77,9 +90,11 @@ class ModuleSyncWatchdogSpec extends Specification {
1 * spiedAsyncTaskExecutor.executeTask(*_)
}
- def 'Module sync advised cm handles already handled.'() {
- given: 'sync utilities returns a advise cm handles'
- mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(1)
+ def 'Module sync advised cm handle already handled by other thread.'() {
+ given: 'module sync utilities returns an advised cm handle'
+ mockModuleOperationsUtils.getAdvisedCmHandles() >> createDataNodes(1)
+ and: 'the work queue is not locked'
+ mockWorkQueueLock.tryLock() >> true
and: 'the executor has a thread available'
spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 1
and: 'the semaphore cache indicates the cm handle is already being processed'
@@ -94,7 +109,7 @@ class ModuleSyncWatchdogSpec extends Specification {
given: 'there is still a cm handle in the queue'
moduleSyncWorkQueue.offer(new DataNode())
and: 'sync utilities returns many advise cm handles'
- mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(500)
+ mockModuleOperationsUtils.getAdvisedCmHandles() >> createDataNodes(500)
and: 'the executor has plenty threads available'
spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 10
when: ' module sync is started'
@@ -104,18 +119,42 @@ class ModuleSyncWatchdogSpec extends Specification {
}
def 'Reset failed cm handles.'() {
- given: 'sync utilities returns failed cm handles'
+ given: 'module sync utilities returns failed cm handles'
def failedCmHandles = [new YangModelCmHandle()]
- mockSyncUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> failedCmHandles
+ mockModuleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> failedCmHandles
when: 'reset failed cm handles is started'
- objectUnderTest.resetPreviouslyFailedCmHandles()
+ objectUnderTest.setPreviouslyLockedCmHandlesToAdvised()
then: 'it is delegated to the module sync task (service)'
- 1 * mockModuleSyncTasks.resetFailedCmHandles(failedCmHandles)
+ 1 * mockModuleSyncTasks.setCmHandlesToAdvised(failedCmHandles)
+ }
+
+ def 'Module Sync Locking.'() {
+ given: 'module sync utilities returns an advised cm handle'
+ mockModuleOperationsUtils.getAdvisedCmHandles() >> createDataNodes(1)
+ and: 'can lock is : #canLock'
+ mockWorkQueueLock.tryLock() >> canLock
+ when: 'attempt to populate the work queue'
+ objectUnderTest.populateWorkQueueIfNeeded()
+ then: 'the queue remains empty is #expectQueueRemainsEmpty'
+ assert moduleSyncWorkQueue.isEmpty() == expectQueueRemainsEmpty
+ where: 'the following lock states are applied'
+ canLock | expectQueueRemainsEmpty
+ false | true
+ true | false
+ }
+
+ def 'Sleeper gets interrupted.'() {
+ given: 'sleeper gets interrupted'
+ spiedSleeper.haveALittleRest(_) >> { throw new InterruptedException() }
+ when: 'the watchdog attempts to sleep to save cpu cycles'
+ objectUnderTest.preventBusyWait()
+ then: 'no exception is thrown'
+ noExceptionThrown()
}
def createDataNodes(numberOfDataNodes) {
def dataNodes = []
- (1..numberOfDataNodes).each {dataNodes.add(new DataNode())}
+ numberOfDataNodes.times { dataNodes.add(new DataNode()) }
return dataNodes
}
}