summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorToineSiebelink <toine.siebelink@est.tech>2024-11-05 12:04:03 +0000
committerToineSiebelink <toine.siebelink@est.tech>2024-11-18 09:27:07 +0000
commita0d4bc39ec35534688047772797f42a38780bc29 (patch)
tree886a639b46efdb3f4b7c8a21412770f8a664e124
parent37962e3faca4f2306546c4f70d480b0c323d2c68 (diff)
Test to highlight ModuleSetTag Inefficiencies
- Add (micrometer) instrumentation to expose inefficiencies - Add test config for micrometer - Add setup methods in base to create many cm handles - Set module sync parallelism to 2 for testing - Add clean up methods for hazelcast related tests - added test to show inefficiencies - POC 1 use hazelcast set to prevent multiple threads working on same ModuleSetTag - POC 2 'cache' module set tags per thread to prevent DB looks ups - Main inefficiency left: create schemaset for EACH cm Handled even if same tag. No easy PoC... Change-Id: Idf46b44c475a24727dd7084bb613459f4c29be55 Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java1
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/DmiModelOperations.java7
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncService.java49
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java5
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java18
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfigSpec.groovy5
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncServiceSpec.groovy93
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy5
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfigSpec.groovy18
-rwxr-xr-xcps-ri/src/main/java/org/onap/cps/ri/CpsModulePersistenceServiceImpl.java3
-rw-r--r--cps-service/src/main/java/org/onap/cps/api/impl/CpsModuleServiceImpl.java4
-rw-r--r--integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy31
-rw-r--r--integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleUpgradeSpec.groovy81
-rw-r--r--integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy67
-rw-r--r--integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/PolicyExecutorIntegrationSpec.groovy4
-rw-r--r--integration-test/src/test/java/org/onap/cps/integration/MicroMeterTestConfig.java41
-rw-r--r--integration-test/src/test/resources/application.yml2
17 files changed, 345 insertions, 89 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java
index 109a541cb3..345eefec2a 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java
@@ -51,6 +51,7 @@ public class HazelcastCacheConfig {
protected HazelcastInstance getOrCreateHazelcastInstance(final NamedConfig namedConfig) {
return Hazelcast.getOrCreateHazelcastInstance(defineInstanceConfig(instanceConfigName, namedConfig));
+
}
private Config defineInstanceConfig(final String instanceConfigName, final NamedConfig namedConfig) {
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/DmiModelOperations.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/DmiModelOperations.java
index 8ba70b3a31..a056efd6ce 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/DmiModelOperations.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/DmiModelOperations.java
@@ -26,6 +26,7 @@ import static org.onap.cps.ncmp.impl.models.RequiredDmiService.MODEL;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
+import io.micrometer.core.annotation.Timed;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -33,6 +34,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.api.inventory.models.YangResource;
import org.onap.cps.ncmp.impl.dmi.DmiProperties;
import org.onap.cps.ncmp.impl.dmi.DmiRestClient;
@@ -48,6 +50,7 @@ import org.springframework.stereotype.Service;
/**
* Operations class for DMI Model.
*/
+@Slf4j
@RequiredArgsConstructor
@Service
public class DmiModelOperations {
@@ -62,6 +65,8 @@ public class DmiModelOperations {
* @param yangModelCmHandle the yang model cm handle
* @return module references
*/
+ @Timed(value = "cps.ncmp.inventory.module.references.from.dmi",
+ description = "Time taken to get all module references for a cm handle from dmi")
public List<ModuleReference> getModuleReferences(final YangModelCmHandle yangModelCmHandle) {
final DmiRequestBody dmiRequestBody = DmiRequestBody.builder()
.moduleSetTag(yangModelCmHandle.getModuleSetTag()).build();
@@ -79,6 +84,8 @@ public class DmiModelOperations {
* @param newModuleReferences the unknown module references
* @return yang resources as map of module name to yang(re)source
*/
+ @Timed(value = "cps.ncmp.inventory.yang.resources.from.dmi",
+ description = "Time taken to get list of yang resources from dmi")
public Map<String, String> getNewYangResourcesFromDmi(final YangModelCmHandle yangModelCmHandle,
final Collection<ModuleReference> newModuleReferences) {
if (newModuleReferences.isEmpty()) {
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncService.java
index ca0f1c6a6d..ba50dd3c19 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncService.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncService.java
@@ -26,16 +26,20 @@ import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY
import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY_PARENT;
import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME;
+import com.hazelcast.collection.ISet;
import java.time.OffsetDateTime;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.logging.log4j.util.Strings;
import org.onap.cps.api.CpsAnchorService;
import org.onap.cps.api.CpsDataService;
import org.onap.cps.api.CpsModuleService;
+import org.onap.cps.ncmp.api.exceptions.NcmpException;
import org.onap.cps.ncmp.impl.inventory.models.CmHandleState;
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
import org.onap.cps.spi.CascadeDeleteAllowed;
@@ -50,12 +54,15 @@ import org.springframework.stereotype.Service;
@RequiredArgsConstructor
public class ModuleSyncService {
+ private static final Map<String, String> NO_NEW_MODULES = Collections.emptyMap();
+
private final DmiModelOperations dmiModelOperations;
private final CpsModuleService cpsModuleService;
private final CpsDataService cpsDataService;
private final CpsAnchorService cpsAnchorService;
private final JsonObjectMapper jsonObjectMapper;
- private static final Map<String, String> NO_NEW_MODULES = Collections.emptyMap();
+ private final ISet<String> moduleSetTagsBeingProcessed;
+ private final Map<String, ModuleDelta> privateModuleSetCache = new HashMap<>();
@AllArgsConstructor
private static final class ModuleDelta {
@@ -69,11 +76,37 @@ public class ModuleSyncService {
* @param yangModelCmHandle the yang model of cm handle.
*/
public void syncAndCreateSchemaSetAndAnchor(final YangModelCmHandle yangModelCmHandle) {
- final ModuleDelta moduleDelta = getModuleDelta(yangModelCmHandle, yangModelCmHandle.getModuleSetTag());
- final String cmHandleId = yangModelCmHandle.getId();
- cpsModuleService.createSchemaSetFromModules(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId,
+ final String moduleSetTag = yangModelCmHandle.getModuleSetTag();
+ final ModuleDelta moduleDelta;
+ boolean isNewModuleSetTag = Strings.isNotBlank(moduleSetTag);
+ try {
+ if (privateModuleSetCache.containsKey(moduleSetTag)) {
+ moduleDelta = privateModuleSetCache.get(moduleSetTag);
+ } else {
+ if (isNewModuleSetTag) {
+ if (moduleSetTagsBeingProcessed.add(moduleSetTag)) {
+ log.info("Processing new module set tag {}", moduleSetTag);
+ } else {
+ isNewModuleSetTag = false;
+ throw new NcmpException("Concurrent processing of module set tag " + moduleSetTag,
+ moduleSetTag + " already being processed for cm handle " + yangModelCmHandle.getId());
+ }
+ }
+ moduleDelta = getModuleDelta(yangModelCmHandle, moduleSetTag);
+ }
+ final String cmHandleId = yangModelCmHandle.getId();
+ cpsModuleService.createSchemaSetFromModules(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId,
moduleDelta.newModuleNameToContentMap, moduleDelta.allModuleReferences);
- cpsAnchorService.createAnchor(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, cmHandleId);
+ cpsAnchorService.createAnchor(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, cmHandleId);
+ if (isNewModuleSetTag) {
+ final ModuleDelta noModuleDelta = new ModuleDelta(moduleDelta.allModuleReferences, NO_NEW_MODULES);
+ privateModuleSetCache.put(moduleSetTag, noModuleDelta);
+ }
+ } finally {
+ if (isNewModuleSetTag) {
+ moduleSetTagsBeingProcessed.remove(moduleSetTag);
+ }
+ }
}
/**
@@ -105,6 +138,10 @@ public class ModuleSyncService {
}
}
+ public void clearPrivateModuleSetCache() {
+ privateModuleSetCache.clear();
+ }
+
private ModuleDelta getModuleDelta(final YangModelCmHandle yangModelCmHandle, final String targetModuleSetTag) {
final Map<String, String> newYangResources;
Collection<ModuleReference> allModuleReferences = getModuleReferencesByModuleSetTag(targetModuleSetTag);
@@ -120,7 +157,7 @@ public class ModuleSyncService {
}
private Collection<ModuleReference> getModuleReferencesByModuleSetTag(final String moduleSetTag) {
- if (moduleSetTag == null || moduleSetTag.trim().isEmpty()) {
+ if (Strings.isBlank(moduleSetTag)) {
return Collections.emptyList();
}
return cpsModuleService.getModuleReferencesByAttribute(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR,
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
index 31fcbad08b..7cc74a3b55 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
@@ -63,7 +63,8 @@ public class ModuleSyncTasks {
try {
cmHandlesAsDataNodes.forEach(cmHandleAsDataNode -> {
final YangModelCmHandle yangModelCmHandle = YangDataConverter.toYangModelCmHandle(cmHandleAsDataNode);
- cmHandleStatePerCmHandle.put(yangModelCmHandle, processCmHandle(yangModelCmHandle));
+ final CmHandleState cmHandleState = processCmHandle(yangModelCmHandle);
+ cmHandleStatePerCmHandle.put(yangModelCmHandle, cmHandleState);
});
} finally {
batchCounter.getAndDecrement();
@@ -127,4 +128,4 @@ public class ModuleSyncTasks {
log.info("{} removed from in progress map", resetCmHandleId);
}
}
-} \ No newline at end of file
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java
index 1f33cc349d..b98075c06c 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java
@@ -20,8 +20,10 @@
package org.onap.cps.ncmp.impl.inventory.sync;
+import com.hazelcast.collection.ISet;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.QueueConfig;
+import com.hazelcast.config.SetConfig;
import com.hazelcast.map.IMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.Lock;
@@ -44,6 +46,8 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig {
private static final QueueConfig commonQueueConfig = createQueueConfig("defaultQueueConfig");
private static final MapConfig moduleSyncStartedConfig = createMapConfig("moduleSyncStartedConfig");
private static final MapConfig dataSyncSemaphoresConfig = createMapConfig("dataSyncSemaphoresConfig");
+ private static final SetConfig moduleSetTagsBeingProcessedConfig
+ = createSetConfig("moduleSetTagsBeingProcessedConfig");
private static final String LOCK_NAME_FOR_WORK_QUEUE = "workQueueLock";
/**
@@ -63,8 +67,7 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig {
*/
@Bean
public IMap<String, Object> moduleSyncStartedOnCmHandles() {
- return getOrCreateHazelcastInstance(moduleSyncStartedConfig).getMap(
- "moduleSyncStartedOnCmHandles");
+ return getOrCreateHazelcastInstance(moduleSyncStartedConfig).getMap("moduleSyncStartedOnCmHandles");
}
/**
@@ -78,6 +81,17 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig {
}
/**
+ * Collection of (new) module set tags being processed.
+ * To prevent processing on multiple threads of same tag
+ *
+ * @return set of module set tags being processed
+ */
+ @Bean
+ public ISet<String> moduleSetTagsBeingProcessed() {
+ return getOrCreateHazelcastInstance(moduleSetTagsBeingProcessedConfig).getSet("moduleSetTagsBeingProcessed");
+ }
+
+ /**
* Retrieves a distributed lock used to control access to the work queue for module synchronization.
* This lock ensures that the population and modification of the work queue are thread-safe and
* protected from concurrent access across different nodes in the distributed system.
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfigSpec.groovy
index 0bd838437d..c08ff75a44 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfigSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfigSpec.groovy
@@ -22,12 +22,17 @@ package org.onap.cps.ncmp.impl.cache
import com.hazelcast.config.Config
import com.hazelcast.config.RestEndpointGroup
+import com.hazelcast.core.Hazelcast
import spock.lang.Specification
class HazelcastCacheConfigSpec extends Specification {
def objectUnderTest = new HazelcastCacheConfig()
+ def cleanupSpec() {
+ Hazelcast.getHazelcastInstanceByName('my instance config').shutdown()
+ }
+
def 'Create Hazelcast instance with a #scenario'() {
given: 'a cluster name and instance config name'
objectUnderTest.clusterName = 'my cluster'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncServiceSpec.groovy
index 6030e5debf..2f13a9a483 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncServiceSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncServiceSpec.groovy
@@ -20,14 +20,17 @@
package org.onap.cps.ncmp.impl.inventory.sync
+import com.hazelcast.collection.ISet
import org.onap.cps.api.CpsAnchorService
import org.onap.cps.api.CpsDataService
import org.onap.cps.api.CpsModuleService
+import org.onap.cps.ncmp.api.exceptions.NcmpException
import org.onap.cps.ncmp.api.inventory.models.CompositeStateBuilder
import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
import org.onap.cps.ncmp.impl.inventory.CmHandleQueryService
import org.onap.cps.ncmp.impl.inventory.models.CmHandleState
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
+import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncService.ModuleDelta
import org.onap.cps.spi.CascadeDeleteAllowed
import org.onap.cps.spi.exceptions.SchemaSetNotFoundException
import org.onap.cps.spi.model.ModuleReference
@@ -45,18 +48,22 @@ class ModuleSyncServiceSpec extends Specification {
def mockCmHandleQueries = Mock(CmHandleQueryService)
def mockCpsDataService = Mock(CpsDataService)
def mockJsonObjectMapper = Mock(JsonObjectMapper)
+ def mockModuleSetTagsBeingProcessed = Mock(ISet<String>);
- def objectUnderTest = new ModuleSyncService(mockDmiModelOperations, mockCpsModuleService,
- mockCpsDataService, mockCpsAnchorService, mockJsonObjectMapper)
+ def objectUnderTest = new ModuleSyncService(mockDmiModelOperations, mockCpsModuleService, mockCpsDataService, mockCpsAnchorService, mockJsonObjectMapper, mockModuleSetTagsBeingProcessed)
def expectedDataspaceName = NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME
- def 'Sync model for a NEW cm handle using module set tags: #scenario.'() {
- given: 'a cm handle state to be synced'
- def ncmpServiceCmHandle = new NcmpServiceCmHandle()
- ncmpServiceCmHandle.setCompositeState(new CompositeStateBuilder().withCmHandleState(CmHandleState.ADVISED).build())
- ncmpServiceCmHandle.cmHandleId = 'ch-1'
- def yangModelCmHandle = YangModelCmHandle.toYangModelCmHandle('some service name', '', '', ncmpServiceCmHandle, moduleSetTag, '', '')
+ def setup() {
+ // Allow tags for al test except 'duplicate-processing-tag' to be added to processing semaphore
+ mockModuleSetTagsBeingProcessed.add('new-tag') >> true
+ mockModuleSetTagsBeingProcessed.add('same-tag') >> true
+ mockModuleSetTagsBeingProcessed.add('cached-tag') >> true
+ }
+
+ def 'Sync models for a NEW cm handle using module set tags: #scenario.'() {
+ given: 'a cm handle to be synced'
+ def yangModelCmHandle = createAdvisedCmHandle(moduleSetTag)
and: 'DMI operations returns some module references'
def moduleReferences = [ new ModuleReference('module1','1'), new ModuleReference('module2','2') ]
mockDmiModelOperations.getModuleReferences(yangModelCmHandle) >> moduleReferences
@@ -75,10 +82,60 @@ class ModuleSyncServiceSpec extends Specification {
where: 'the following parameters are used'
scenario | identifiedNewModuleReferences | newModuleNameContentToMap | moduleSetTag | existingModuleReferences
'one new module, new tag' | [new ModuleReference('module1', '1')] | [module1: 'some yang source'] | '' | []
- 'no new module, new tag' | [] | [:] | 'new-tag-1' | []
+ 'no new module, new tag' | [] | [:] | 'new-tag' | []
'same tag' | [] | [:] | 'same-tag' | [new ModuleReference('module1', '1'), new ModuleReference('module2', '2')]
}
+ def 'Attempt Sync models for a cm handle with exception and #scenario module set tag'() {
+ given: 'a cm handle to be synced'
+ def yangModelCmHandle = createAdvisedCmHandle(moduleSetTag)
+ and: 'the service returns a list of module references when queried with the specified attributes'
+ mockCpsModuleService.getModuleReferencesByAttribute(*_) >> [new ModuleReference('module1', '1')]
+ and: 'exception occurs when try to store result'
+ def testException = new RuntimeException('test')
+ mockCpsModuleService.createSchemaSetFromModules(*_) >> { throw testException }
+ when: 'module sync is triggered'
+ objectUnderTest.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle)
+ then: 'the same exception is thrown up'
+ def exceptionThrown = thrown(Exception)
+ assert testException == exceptionThrown
+ and: 'module set tag is removed from processing semaphores only when needed'
+ expectedCallsToRemoveTag * mockModuleSetTagsBeingProcessed.remove('new-tag')
+ where: 'following module set tags are used'
+ scenario | moduleSetTag || expectedCallsToRemoveTag
+ 'with' | 'new-tag' || 1
+ 'without' | ' ' || 0
+ }
+
+ def 'Sync models for a cm handle with previously cached module set tag.'() {
+ given: 'a cm handle to be synced'
+ def yangModelCmHandle = createAdvisedCmHandle('cached-tag')
+ and: 'The module set tag exist in the private cache'
+ def moduleReferences = [ new ModuleReference('module1','1') ]
+ def cachedModuleDelta = new ModuleDelta(moduleReferences, [:])
+ objectUnderTest.privateModuleSetCache.put('cached-tag', cachedModuleDelta)
+ when: 'module sync is triggered'
+ objectUnderTest.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle)
+ then: 'create schema set from module is invoked with correct parameters'
+ 1 * mockCpsModuleService.createSchemaSetFromModules(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, 'ch-1', [:], moduleReferences)
+ and: 'anchor is created with the correct parameters'
+ 1 * mockCpsAnchorService.createAnchor(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, 'ch-1', 'ch-1')
+ }
+
+ def 'Attempt to sync using a module set tag already being processed by a different instance or thread.'() {
+ given: 'a cm handle to be synced'
+ def yangModelCmHandle = createAdvisedCmHandle('duplicateTag')
+ and: 'The module set tag already exist in the processing semaphore set'
+ mockModuleSetTagsBeingProcessed.add('duplicate-processing-tag') > false
+ when: 'module sync is triggered'
+ objectUnderTest.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle)
+ then: 'a ncmp exception is thrown with the relevant details'
+ def exceptionThrown = thrown(NcmpException)
+ assert exceptionThrown.message.contains('duplicateTag')
+ assert exceptionThrown.details.contains('duplicateTag')
+ assert exceptionThrown.details.contains('ch-1')
+ }
+
def 'Upgrade model for an existing cm handle with Module Set Tag where the modules are #scenario'() {
given: 'a cm handle being upgraded to module set tag: tag-1'
def ncmpServiceCmHandle = new NcmpServiceCmHandle()
@@ -113,7 +170,7 @@ class ModuleSyncServiceSpec extends Specification {
'in database' | [new ModuleReference('module1', '1')]
}
- def 'upgrade model for a existing cm handle'() {
+ def 'upgrade model for an existing cm handle'() {
given: 'a cm handle that is ready but locked for upgrade'
def ncmpServiceCmHandle = new NcmpServiceCmHandle()
ncmpServiceCmHandle.setCompositeState(new CompositeStateBuilder()
@@ -159,4 +216,20 @@ class ModuleSyncServiceSpec extends Specification {
result == unsupportedOperationException
}
+ def 'Clear module set cache.'() {
+ given: 'something in the module set cache'
+ objectUnderTest.privateModuleSetCache.put('test',new ModuleDelta([],[:]))
+ when: 'the cache is cleared'
+ objectUnderTest.clearPrivateModuleSetCache()
+ then: 'the cache is empty'
+ objectUnderTest.privateModuleSetCache.isEmpty()
+ }
+
+ def createAdvisedCmHandle(moduleSetTag) {
+ def ncmpServiceCmHandle = new NcmpServiceCmHandle()
+ ncmpServiceCmHandle.setCompositeState(new CompositeStateBuilder().withCmHandleState(CmHandleState.ADVISED).build())
+ ncmpServiceCmHandle.cmHandleId = 'ch-1'
+ return YangModelCmHandle.toYangModelCmHandle('some service name', '', '', ncmpServiceCmHandle, moduleSetTag, '', '')
+ }
+
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy
index 8ce1e934f2..e21c868bbf 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy
@@ -26,6 +26,7 @@ import ch.qos.logback.classic.Logger
import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.read.ListAppender
import com.hazelcast.config.Config
+import com.hazelcast.core.Hazelcast
import com.hazelcast.instance.impl.HazelcastInstanceFactory
import com.hazelcast.map.IMap
import org.onap.cps.ncmp.api.inventory.models.CompositeState
@@ -75,6 +76,10 @@ class ModuleSyncTasksSpec extends Specification {
def objectUnderTest = new ModuleSyncTasks(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService,
mockLcmEventsCmHandleStateHandler, moduleSyncStartedOnCmHandles)
+ def cleanupSpec() {
+ Hazelcast.getHazelcastInstanceByName('hazelcastInstanceName').shutdown()
+ }
+
def 'Module Sync ADVISED cm handles.'() {
given: 'cm handles in an ADVISED state'
def cmHandle1 = cmHandleAsDataNodeByIdAndState('cm-handle-1', CmHandleState.ADVISED)
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfigSpec.groovy
index 4c96d6b822..c2ecf927c8 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfigSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfigSpec.groovy
@@ -20,6 +20,7 @@
package org.onap.cps.ncmp.impl.inventory.sync
+import com.hazelcast.collection.ISet
import com.hazelcast.config.Config
import com.hazelcast.core.Hazelcast
import com.hazelcast.map.IMap
@@ -38,13 +39,16 @@ import java.util.concurrent.TimeUnit
class SynchronizationCacheConfigSpec extends Specification {
@Autowired
- private BlockingQueue<DataNode> moduleSyncWorkQueue
+ BlockingQueue<DataNode> moduleSyncWorkQueue
@Autowired
- private IMap<String, Object> moduleSyncStartedOnCmHandles
+ IMap<String, Object> moduleSyncStartedOnCmHandles
@Autowired
- private IMap<String, Boolean> dataSyncSemaphores
+ IMap<String, Boolean> dataSyncSemaphores
+
+ @Autowired
+ ISet<String> moduleSetTagsBeingProcessed
def cleanupSpec() {
Hazelcast.getHazelcastInstanceByName('cps-and-ncmp-hazelcast-instance-test-config').shutdown()
@@ -57,8 +61,11 @@ class SynchronizationCacheConfigSpec extends Specification {
assert null != moduleSyncStartedOnCmHandles
and: 'system is able to create an instance of a map to hold data sync semaphores'
assert null != dataSyncSemaphores
- and: 'they have the correct names (in any order)'
- assert Hazelcast.allHazelcastInstances.name.contains('cps-and-ncmp-hazelcast-instance-test-config')
+ and: 'system is able to create an instance of a set to hold module set tags being processed'
+ assert null != moduleSetTagsBeingProcessed
+ and: 'there is only one instance with the correct name'
+ assert Hazelcast.allHazelcastInstances.size() == 1
+ assert Hazelcast.allHazelcastInstances.name[0] == 'cps-and-ncmp-hazelcast-instance-test-config'
}
def 'Verify configs for Distributed objects'(){
@@ -103,7 +110,6 @@ class SynchronizationCacheConfigSpec extends Specification {
then: 'applied properties are reflected'
assert testConfig.networkConfig.join.kubernetesConfig.enabled
assert testConfig.networkConfig.join.kubernetesConfig.properties.get('service-name') == 'test-service-name'
-
}
def 'Time to Live Verify for Module Sync Semaphore'() {
diff --git a/cps-ri/src/main/java/org/onap/cps/ri/CpsModulePersistenceServiceImpl.java b/cps-ri/src/main/java/org/onap/cps/ri/CpsModulePersistenceServiceImpl.java
index 6f491ba3b7..3368aee148 100755
--- a/cps-ri/src/main/java/org/onap/cps/ri/CpsModulePersistenceServiceImpl.java
+++ b/cps-ri/src/main/java/org/onap/cps/ri/CpsModulePersistenceServiceImpl.java
@@ -27,6 +27,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableSet;
+import io.micrometer.core.annotation.Timed;
import jakarta.transaction.Transactional;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -186,6 +187,8 @@ public class CpsModulePersistenceServiceImpl implements CpsModulePersistenceServ
// can occur in case of specific concurrent requests.
@Retryable(retryFor = DuplicatedYangResourceException.class, maxAttempts = 5, backoff =
@Backoff(random = true, delay = 200, maxDelay = 2000, multiplier = 2))
+ @Timed(value = "cps.module.persistence.schemaset.store",
+ description = "Time taken to store a schemaset (list of module references")
public void storeSchemaSetFromModules(final String dataspaceName, final String schemaSetName,
final Map<String, String> newModuleNameToContentMap,
final Collection<ModuleReference> allModuleReferences) {
diff --git a/cps-service/src/main/java/org/onap/cps/api/impl/CpsModuleServiceImpl.java b/cps-service/src/main/java/org/onap/cps/api/impl/CpsModuleServiceImpl.java
index a600b22b61..4063a7f769 100644
--- a/cps-service/src/main/java/org/onap/cps/api/impl/CpsModuleServiceImpl.java
+++ b/cps-service/src/main/java/org/onap/cps/api/impl/CpsModuleServiceImpl.java
@@ -171,8 +171,8 @@ public class CpsModuleServiceImpl implements CpsModuleService {
return cpsModulePersistenceService.identifyNewModuleReferences(moduleReferencesToCheck);
}
- @Timed(value = "cps.module.service.module.reference.query",
- description = "Time taken to query list of module references")
+ @Timed(value = "cps.module.service.module.reference.query.by.attribute",
+ description = "Time taken to query list of module references by attribute (e.g moduleSetTag)")
@Override
public Collection<ModuleReference> getModuleReferencesByAttribute(final String dataspaceName,
final String anchorName,
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy
index 759eccd966..02a10cfa6b 100644
--- a/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy
@@ -21,6 +21,7 @@
package org.onap.cps.integration.base
+import com.hazelcast.collection.ISet
import okhttp3.mockwebserver.MockWebServer
import org.onap.cps.api.CpsAnchorService
import org.onap.cps.api.CpsDataService
@@ -37,13 +38,13 @@ import org.onap.cps.ncmp.impl.data.NetworkCmProxyQueryService
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
import org.onap.cps.ncmp.impl.inventory.ParameterizedCmHandleQueryService
import org.onap.cps.ncmp.impl.inventory.models.CmHandleState
+import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncService
import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncWatchdog
import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher
import org.onap.cps.ri.repository.DataspaceRepository
import org.onap.cps.ri.utils.SessionManager
import org.onap.cps.spi.exceptions.DataspaceNotFoundException
import org.onap.cps.spi.model.DataNode
-import org.onap.cps.utils.ContentType
import org.onap.cps.utils.JsonObjectMapper
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
@@ -61,13 +62,8 @@ import spock.lang.Specification
import spock.util.concurrent.PollingConditions
import java.time.OffsetDateTime
-import java.time.format.DateTimeFormatter
import java.util.concurrent.BlockingQueue
-import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DATASPACE_NAME
-import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY_ANCHOR
-import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY_PARENT
-
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK, classes = [CpsDataspaceService])
@Testcontainers
@EnableAutoConfiguration
@@ -121,6 +117,9 @@ abstract class CpsIntegrationSpecBase extends Specification {
ModuleSyncWatchdog moduleSyncWatchdog
@Autowired
+ ModuleSyncService moduleSyncService
+
+ @Autowired
BlockingQueue<DataNode> moduleSyncWorkQueue
@Autowired
@@ -132,6 +131,8 @@ abstract class CpsIntegrationSpecBase extends Specification {
@Autowired
AlternateIdMatcher alternateIdMatcher
+ @Autowired
+ ISet<String> moduleSetTagsBeingProcessed
@Value('${ncmp.policy-executor.server.port:8080}')
private String policyServerPort;
@@ -174,13 +175,13 @@ abstract class CpsIntegrationSpecBase extends Specification {
DMI1_URL = String.format("http://%s:%s", mockDmiServer1.getHostName(), mockDmiServer1.getPort())
DMI2_URL = String.format("http://%s:%s", mockDmiServer2.getHostName(), mockDmiServer2.getPort())
-
}
def cleanup() {
mockDmiServer1.shutdown()
mockDmiServer2.shutdown()
mockPolicyServer.shutdown()
+ moduleSetTagsBeingProcessed.clear()
}
def static readResourceDataFile(filename) {
@@ -262,11 +263,16 @@ abstract class CpsIntegrationSpecBase extends Specification {
networkCmProxyInventoryFacade.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: dmiPlugin, createdCmHandles: [cmHandleToCreate]))
}
- def registerSequenceOfCmHandlesWithoutWaitForReady(dmiPlugin, moduleSetTag, numberOfCmHandles) {
+ def registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(dmiPlugin, moduleSetTag, numberOfCmHandles, offset) {
def cmHandles = []
+ def id = offset
+ def moduleReferences = (1..200).collect { moduleSetTag + '_Module_' + it.toString() }
(1..numberOfCmHandles).each {
- def cmHandle = new NcmpServiceCmHandle(cmHandleId: 'ch-'+it, moduleSetTag: moduleSetTag, alternateId: NO_ALTERNATE_ID)
- cmHandles.add(cmHandle)
+ def ncmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: 'ch-'+id, moduleSetTag: moduleSetTag, alternateId: NO_ALTERNATE_ID)
+ cmHandles.add(ncmpServiceCmHandle)
+ dmiDispatcher1.moduleNamesPerCmHandleId[ncmpServiceCmHandle.cmHandleId] = moduleReferences
+ dmiDispatcher2.moduleNamesPerCmHandleId[ncmpServiceCmHandle.cmHandleId] = moduleReferences
+ id++
}
networkCmProxyInventoryFacade.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: dmiPlugin, createdCmHandles: cmHandles))
}
@@ -279,9 +285,10 @@ abstract class CpsIntegrationSpecBase extends Specification {
networkCmProxyInventoryFacade.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: dmiPlugin, removedCmHandles: cmHandleIds))
}
- def deregisterSequenceOfCmHandles(dmiPlugin, numberOfCmHandles) {
+ def deregisterSequenceOfCmHandles(dmiPlugin, numberOfCmHandles, offset) {
def cmHandleIds = []
- (1..numberOfCmHandles).each { cmHandleIds.add('ch-'+it) }
+ def id = offset
+ (1..numberOfCmHandles).each { cmHandleIds.add('ch-' + id++) }
networkCmProxyInventoryFacade.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: dmiPlugin, removedCmHandles: cmHandleIds))
}
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleUpgradeSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleUpgradeSpec.groovy
index 64449371fe..a5e3daf289 100644
--- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleUpgradeSpec.groovy
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleUpgradeSpec.groovy
@@ -33,54 +33,55 @@ class CmHandleUpgradeSpec extends CpsIntegrationSpecBase {
NetworkCmProxyInventoryFacade objectUnderTest
- static final CM_HANDLE_ID = 'ch-1'
- static final CM_HANDLE_ID_WITH_EXISTING_MODULE_SET_TAG = 'ch-2'
+ def cmHandleId = 'ch-1'
+ def cmHandleIdWithExistingModuleSetTag = 'ch-2'
def setup() {
objectUnderTest = networkCmProxyInventoryFacade
+ moduleSyncService.clearPrivateModuleSetCache()
}
def 'Upgrade CM-handle with new moduleSetTag or no moduleSetTag.'() {
given: 'a CM-handle is created with expected initial modules: M1 and M2'
- dmiDispatcher1.moduleNamesPerCmHandleId[CM_HANDLE_ID] = ['M1', 'M2']
- registerCmHandle(DMI1_URL, CM_HANDLE_ID, initialModuleSetTag)
- assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(CM_HANDLE_ID).moduleName.sort()
+ dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleId] = ['M1', 'M2']
+ registerCmHandle(DMI1_URL, cmHandleId, initialModuleSetTag)
+ assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort()
when: "the CM-handle is upgraded with given moduleSetTag '${updatedModuleSetTag}'"
- def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [CM_HANDLE_ID], moduleSetTag: updatedModuleSetTag)
+ def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [cmHandleId], moduleSetTag: updatedModuleSetTag)
def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(
new DmiPluginRegistration(dmiPlugin: DMI1_URL, upgradedCmHandles: cmHandlesToUpgrade))
then: 'registration gives successful response'
- assert dmiPluginRegistrationResponse.upgradedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(CM_HANDLE_ID)]
+ assert dmiPluginRegistrationResponse.upgradedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)]
and: 'CM-handle is in LOCKED state due to MODULE_UPGRADE'
- def cmHandleCompositeState = objectUnderTest.getCmHandleCompositeState(CM_HANDLE_ID)
+ def cmHandleCompositeState = objectUnderTest.getCmHandleCompositeState(cmHandleId)
assert cmHandleCompositeState.cmHandleState == CmHandleState.LOCKED
assert cmHandleCompositeState.lockReason.lockReasonCategory == LockReasonCategory.MODULE_UPGRADE
assert cmHandleCompositeState.lockReason.details == "Upgrade to ModuleSetTag: ${updatedModuleSetTag}"
when: 'DMI will return different modules for upgrade: M1 and M3'
- dmiDispatcher1.moduleNamesPerCmHandleId[CM_HANDLE_ID] = ['M1', 'M3']
+ dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleId] = ['M1', 'M3']
and: 'the module sync watchdog is triggered twice'
2.times { moduleSyncWatchdog.moduleSyncAdvisedCmHandles() }
then: 'CM-handle goes to READY state'
new PollingConditions().within(MODULE_SYNC_WAIT_TIME_IN_SECONDS, () -> {
- assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(CM_HANDLE_ID).cmHandleState
+ assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(cmHandleId).cmHandleState
})
and: 'the CM-handle has expected moduleSetTag'
- assert objectUnderTest.getNcmpServiceCmHandle(CM_HANDLE_ID).moduleSetTag == updatedModuleSetTag
+ assert objectUnderTest.getNcmpServiceCmHandle(cmHandleId).moduleSetTag == updatedModuleSetTag
and: 'CM-handle has expected updated modules: M1 and M3'
- assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences(CM_HANDLE_ID).moduleName.sort()
+ assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort()
cleanup: 'deregister CM-handle'
- deregisterCmHandle(DMI1_URL, CM_HANDLE_ID)
+ deregisterCmHandle(DMI1_URL, cmHandleId)
- where:
+ where: 'following module set tags are used'
initialModuleSetTag | updatedModuleSetTag
NO_MODULE_SET_TAG | NO_MODULE_SET_TAG
NO_MODULE_SET_TAG | 'new'
@@ -90,39 +91,39 @@ class CmHandleUpgradeSpec extends CpsIntegrationSpecBase {
def 'Upgrade CM-handle with existing moduleSetTag.'() {
given: 'DMI will return modules for registration'
- dmiDispatcher1.moduleNamesPerCmHandleId[CM_HANDLE_ID] = ['M1', 'M2']
- dmiDispatcher1.moduleNamesPerCmHandleId[CM_HANDLE_ID_WITH_EXISTING_MODULE_SET_TAG] = ['M1', 'M3']
+ dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleId] = ['M1', 'M2']
+ dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleIdWithExistingModuleSetTag] = ['M1', 'M3']
and: "an existing CM-handle handle with moduleSetTag '${updatedModuleSetTag}'"
- registerCmHandle(DMI1_URL, CM_HANDLE_ID_WITH_EXISTING_MODULE_SET_TAG, updatedModuleSetTag)
- assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences(CM_HANDLE_ID_WITH_EXISTING_MODULE_SET_TAG).moduleName.sort()
+ registerCmHandle(DMI1_URL, cmHandleIdWithExistingModuleSetTag, updatedModuleSetTag)
+ assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences(cmHandleIdWithExistingModuleSetTag).moduleName.sort()
and: "a CM-handle with moduleSetTag '${initialModuleSetTag}' which will be upgraded"
- registerCmHandle(DMI1_URL, CM_HANDLE_ID, initialModuleSetTag)
- assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(CM_HANDLE_ID).moduleName.sort()
+ registerCmHandle(DMI1_URL, cmHandleId, initialModuleSetTag)
+ assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort()
when: "CM-handle is upgraded to moduleSetTag '${updatedModuleSetTag}'"
- def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [CM_HANDLE_ID], moduleSetTag: updatedModuleSetTag)
+ def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [cmHandleId], moduleSetTag: updatedModuleSetTag)
def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(
new DmiPluginRegistration(dmiPlugin: DMI1_URL, upgradedCmHandles: cmHandlesToUpgrade))
then: 'registration gives successful response'
- assert dmiPluginRegistrationResponse.upgradedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(CM_HANDLE_ID)]
+ assert dmiPluginRegistrationResponse.upgradedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)]
and: 'the module sync watchdog is triggered twice'
2.times { moduleSyncWatchdog.moduleSyncAdvisedCmHandles() }
and: 'CM-handle goes to READY state'
new PollingConditions().within(MODULE_SYNC_WAIT_TIME_IN_SECONDS, () -> {
- assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(CM_HANDLE_ID).cmHandleState
+ assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(cmHandleId).cmHandleState
})
and: 'the CM-handle has expected moduleSetTag'
- assert objectUnderTest.getNcmpServiceCmHandle(CM_HANDLE_ID).moduleSetTag == updatedModuleSetTag
+ assert objectUnderTest.getNcmpServiceCmHandle(cmHandleId).moduleSetTag == updatedModuleSetTag
and: 'CM-handle has expected updated modules: M1 and M3'
- assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences(CM_HANDLE_ID).moduleName.sort()
+ assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort()
cleanup: 'deregister CM-handle'
- deregisterCmHandles(DMI1_URL, [CM_HANDLE_ID, CM_HANDLE_ID_WITH_EXISTING_MODULE_SET_TAG])
+ deregisterCmHandles(DMI1_URL, [cmHandleId, cmHandleIdWithExistingModuleSetTag])
where:
initialModuleSetTag | updatedModuleSetTag
@@ -132,37 +133,37 @@ class CmHandleUpgradeSpec extends CpsIntegrationSpecBase {
def 'Skip upgrade of CM-handle with same moduleSetTag as before.'() {
given: 'an existing CM-handle with expected initial modules: M1 and M2'
- dmiDispatcher1.moduleNamesPerCmHandleId[CM_HANDLE_ID] = ['M1', 'M2']
- registerCmHandle(DMI1_URL, CM_HANDLE_ID, 'same')
- assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(CM_HANDLE_ID).moduleName.sort()
+ dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleId] = ['M1', 'M2']
+ registerCmHandle(DMI1_URL, cmHandleId, 'same')
+ assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort()
when: 'CM-handle is upgraded with the same moduleSetTag'
- def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [CM_HANDLE_ID], moduleSetTag: 'same')
+ def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [cmHandleId], moduleSetTag: 'same')
objectUnderTest.updateDmiRegistration(
new DmiPluginRegistration(dmiPlugin: DMI1_URL, upgradedCmHandles: cmHandlesToUpgrade))
then: 'CM-handle remains in READY state'
- assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(CM_HANDLE_ID).cmHandleState
+ assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(cmHandleId).cmHandleState
and: 'the CM-handle has same moduleSetTag as before'
- assert objectUnderTest.getNcmpServiceCmHandle(CM_HANDLE_ID).moduleSetTag == 'same'
+ assert objectUnderTest.getNcmpServiceCmHandle(cmHandleId).moduleSetTag == 'same'
then: 'CM-handle has same modules as before: M1 and M2'
- assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(CM_HANDLE_ID).moduleName.sort()
+ assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort()
cleanup: 'deregister CM-handle'
- deregisterCmHandle(DMI1_URL, CM_HANDLE_ID)
+ deregisterCmHandle(DMI1_URL, cmHandleId)
}
def 'Upgrade of CM-handle fails due to DMI error.'() {
given: 'a CM-handle exists'
- dmiDispatcher1.moduleNamesPerCmHandleId[CM_HANDLE_ID] = ['M1', 'M2']
- registerCmHandle(DMI1_URL, CM_HANDLE_ID, 'oldTag')
+ dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleId] = ['M1', 'M2']
+ registerCmHandle(DMI1_URL, cmHandleId, 'oldTag')
and: 'DMI is not available for upgrade'
dmiDispatcher1.isAvailable = false
when: 'the CM-handle is upgraded'
- def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [CM_HANDLE_ID], moduleSetTag: 'newTag')
+ def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [cmHandleId], moduleSetTag: 'newTag')
objectUnderTest.updateDmiRegistration(
new DmiPluginRegistration(dmiPlugin: DMI1_URL, upgradedCmHandles: cmHandlesToUpgrade))
@@ -171,16 +172,16 @@ class CmHandleUpgradeSpec extends CpsIntegrationSpecBase {
then: 'CM-handle goes to LOCKED state with reason MODULE_UPGRADE_FAILED'
new PollingConditions().within(MODULE_SYNC_WAIT_TIME_IN_SECONDS, () -> {
- def cmHandleCompositeState = objectUnderTest.getCmHandleCompositeState(CM_HANDLE_ID)
+ def cmHandleCompositeState = objectUnderTest.getCmHandleCompositeState(cmHandleId)
assert cmHandleCompositeState.cmHandleState == CmHandleState.LOCKED
assert cmHandleCompositeState.lockReason.lockReasonCategory == LockReasonCategory.MODULE_UPGRADE_FAILED
})
and: 'the CM-handle has same moduleSetTag as before'
- assert objectUnderTest.getNcmpServiceCmHandle(CM_HANDLE_ID).moduleSetTag == 'oldTag'
+ assert objectUnderTest.getNcmpServiceCmHandle(cmHandleId).moduleSetTag == 'oldTag'
cleanup: 'deregister CM-handle'
- deregisterCmHandle(DMI1_URL, CM_HANDLE_ID)
+ deregisterCmHandle(DMI1_URL, cmHandleId)
}
}
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy
index e0bb437a7c..963bc1fe61 100644
--- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy
@@ -20,26 +20,32 @@
package org.onap.cps.integration.functional.ncmp
+import io.micrometer.core.instrument.MeterRegistry
import org.onap.cps.integration.base.CpsIntegrationSpecBase
import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncWatchdog
+import org.springframework.beans.factory.annotation.Autowired
+import spock.util.concurrent.PollingConditions
import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
ModuleSyncWatchdog objectUnderTest
+ @Autowired
+ MeterRegistry meterRegistry
+
def executorService = Executors.newFixedThreadPool(2)
- def SYNC_SAMPLE_SIZE = 100
+ def PARALLEL_SYNC_SAMPLE_SIZE = 100
def setup() {
objectUnderTest = moduleSyncWatchdog
- registerSequenceOfCmHandlesWithoutWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, SYNC_SAMPLE_SIZE)
}
def cleanup() {
try {
- deregisterSequenceOfCmHandles(DMI1_URL, SYNC_SAMPLE_SIZE)
+ deregisterSequenceOfCmHandles(DMI1_URL, PARALLEL_SYNC_SAMPLE_SIZE, 1)
moduleSyncWorkQueue.clear()
} finally {
executorService.shutdownNow()
@@ -47,15 +53,60 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
}
def 'Watchdog is disabled for test.'() {
+ given:
+ registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1)
when: 'wait a while but less then the initial delay of 10 minutes'
Thread.sleep(3000)
then: 'the work queue remains empty'
assert moduleSyncWorkQueue.isEmpty()
}
+ def 'CPS-2478 Highlight module sync inefficiencies.'() {
+ given: 'register 250 cm handles with module set tag cps-2478-A'
+ def numberOfTags = 2
+ def cmHandlesPerTag = 250
+ def totalCmHandles = numberOfTags * cmHandlesPerTag
+ def offset = 1
+ registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'cps-2478-A', cmHandlesPerTag, offset)
+ and: 'register anther 250 cm handles with module set tag cps-2478-B'
+ offset += cmHandlesPerTag
+ registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'cps-2478-B', cmHandlesPerTag, offset)
+ and: 'clear any previous instrumentation'
+ meterRegistry.clear()
+ when: 'sync all advised cm handles'
+ objectUnderTest.moduleSyncAdvisedCmHandles()
+ Thread.sleep(100)
+ then: 'retry until all schema sets are stored in db (1 schema set for each cm handle)'
+ def dbSchemaSetStorageTimer = meterRegistry.get('cps.module.persistence.schemaset.store').timer()
+ new PollingConditions().within(10, () -> {
+ objectUnderTest.moduleSyncAdvisedCmHandles()
+ Thread.sleep(100)
+ assert dbSchemaSetStorageTimer.count() >= 500
+ })
+ then: 'wait till at least 5 batches of state updates are done (often more because of retries of locked cm handles)'
+ def dbStateUpdateTimer = meterRegistry.get('cps.ncmp.cmhandle.state.update.batch').timer()
+ new PollingConditions().within(10, () -> {
+ assert dbStateUpdateTimer.count() >= 5
+ })
+ and: 'the db has been queried for tags exactly 2 times.'
+ def dbModuleQueriesTimer = meterRegistry.get('cps.module.service.module.reference.query.by.attribute').timer()
+ assert dbModuleQueriesTimer.count() == 2
+ and: 'exactly 2 calls to DMI to get module references'
+ def dmiModuleRetrievalTimer = meterRegistry.get('cps.ncmp.inventory.module.references.from.dmi').timer()
+ assert dmiModuleRetrievalTimer.count() == 2
+ and: 'log the relevant instrumentation'
+ logInstrumentation(dbModuleQueriesTimer, 'query module references')
+ logInstrumentation(dmiModuleRetrievalTimer, 'get modules from DMI ')
+ logInstrumentation(dbSchemaSetStorageTimer, 'store schema sets ')
+ logInstrumentation(dbStateUpdateTimer, 'batch state updates ')
+ cleanup: 'remove all cm handles'
+ deregisterSequenceOfCmHandles(DMI1_URL, totalCmHandles, 1)
+ }
+
def 'Populate module sync work queue simultaneously on two parallel threads (CPS-2403).'() {
// This test failed before bug https://lf-onap.atlassian.net/browse/CPS-2403 was fixed
given: 'the queue is empty at the start'
+ registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1)
assert moduleSyncWorkQueue.isEmpty()
when: 'attempt to populate the queue on the main (test) and another parallel thread at the same time'
objectUnderTest.populateWorkQueueIfNeeded()
@@ -63,12 +114,13 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
and: 'wait a little (to give all threads time to complete their task)'
Thread.sleep(50)
then: 'the queue size is exactly the sample size'
- assert moduleSyncWorkQueue.size() == SYNC_SAMPLE_SIZE
+ assert moduleSyncWorkQueue.size() == PARALLEL_SYNC_SAMPLE_SIZE
}
def 'Populate module sync work queue on two parallel threads with a slight difference in start time.'() {
// This test proved that the issue in CPS-2403 did not arise if the the queue was populated and given time to be distributed
given: 'the queue is empty at the start'
+ registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1)
assert moduleSyncWorkQueue.isEmpty()
when: 'attempt to populate the queue on the main (test) and another parallel thread a little later'
objectUnderTest.populateWorkQueueIfNeeded()
@@ -76,7 +128,12 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
and: 'wait a little (to give all threads time to complete their task)'
Thread.sleep(50)
then: 'the queue size is exactly the sample size'
- assert moduleSyncWorkQueue.size() == SYNC_SAMPLE_SIZE
+ assert moduleSyncWorkQueue.size() == PARALLEL_SYNC_SAMPLE_SIZE
+ }
+
+ def logInstrumentation(timer, description) {
+ System.out.println('*** CPS-2478, ' + description + ' : ' + timer.count()+ ' times, total ' + timer.totalTime(TimeUnit.MILLISECONDS) + ' ms')
+ return true
}
def populateQueueWithoutDelay = () -> {
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/PolicyExecutorIntegrationSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/PolicyExecutorIntegrationSpec.groovy
index 56d4bfaee4..f897393a53 100644
--- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/PolicyExecutorIntegrationSpec.groovy
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/PolicyExecutorIntegrationSpec.groovy
@@ -43,9 +43,7 @@ class PolicyExecutorIntegrationSpec extends CpsIntegrationSpecBase {
}
def cleanup() {
- deregisterCmHandle(DMI1_URL, 'ch-1')
- deregisterCmHandle(DMI1_URL, 'ch-2')
- deregisterCmHandle(DMI1_URL, 'ch-3')
+ deregisterSequenceOfCmHandles(DMI1_URL, 3, 1)
}
def 'Policy Executor create request with #scenario.'() {
diff --git a/integration-test/src/test/java/org/onap/cps/integration/MicroMeterTestConfig.java b/integration-test/src/test/java/org/onap/cps/integration/MicroMeterTestConfig.java
new file mode 100644
index 0000000000..3b26f42c8a
--- /dev/null
+++ b/integration-test/src/test/java/org/onap/cps/integration/MicroMeterTestConfig.java
@@ -0,0 +1,41 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the 'License');
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an 'AS IS' BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.integration;
+
+import io.micrometer.core.aop.TimedAspect;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class MicroMeterTestConfig {
+ @Bean
+ public MeterRegistry meterRegistry() {
+ return new SimpleMeterRegistry(); // Use a simple in-memory registry for testing
+ }
+
+ @Bean
+ public TimedAspect timedAspect(final MeterRegistry meterRegistry) {
+ return new TimedAspect(meterRegistry);
+ }
+}
+
diff --git a/integration-test/src/test/resources/application.yml b/integration-test/src/test/resources/application.yml
index b786a3d4c5..30598dfb90 100644
--- a/integration-test/src/test/resources/application.yml
+++ b/integration-test/src/test/resources/application.yml
@@ -191,7 +191,7 @@ ncmp:
modules-sync-watchdog:
async-executor:
- parallelism-level: 1
+ parallelism-level: 2
model-loader:
maximum-attempt-count: 20