diff options
author | ToineSiebelink <toine.siebelink@est.tech> | 2024-11-05 12:04:03 +0000 |
---|---|---|
committer | ToineSiebelink <toine.siebelink@est.tech> | 2024-11-18 09:27:07 +0000 |
commit | a0d4bc39ec35534688047772797f42a38780bc29 (patch) | |
tree | 886a639b46efdb3f4b7c8a21412770f8a664e124 | |
parent | 37962e3faca4f2306546c4f70d480b0c323d2c68 (diff) |
Test to highlight ModuleSetTag Inefficiencies
- Add (micrometer) instrumentation to expose inefficiencies
- Add test config for micrometer
- Add setup methods in base to create many cm handles
- Set module sync parallelism to 2 for testing
- Add clean up methods for hazelcast related tests
- added test to show inefficiencies
- POC 1 use hazelcast set to prevent multiple threads working on same ModuleSetTag
- POC 2 'cache' module set tags per thread to prevent DB looks ups
- Main inefficiency left: create schemaset for EACH cm Handled even if same tag. No easy PoC...
Change-Id: Idf46b44c475a24727dd7084bb613459f4c29be55
Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
17 files changed, 345 insertions, 89 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java index 109a541cb3..345eefec2a 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java @@ -51,6 +51,7 @@ public class HazelcastCacheConfig { protected HazelcastInstance getOrCreateHazelcastInstance(final NamedConfig namedConfig) { return Hazelcast.getOrCreateHazelcastInstance(defineInstanceConfig(instanceConfigName, namedConfig)); + } private Config defineInstanceConfig(final String instanceConfigName, final NamedConfig namedConfig) { diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/DmiModelOperations.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/DmiModelOperations.java index 8ba70b3a31..a056efd6ce 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/DmiModelOperations.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/DmiModelOperations.java @@ -26,6 +26,7 @@ import static org.onap.cps.ncmp.impl.models.RequiredDmiService.MODEL; import com.google.gson.JsonArray; import com.google.gson.JsonObject; +import io.micrometer.core.annotation.Timed; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -33,6 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.api.inventory.models.YangResource; import org.onap.cps.ncmp.impl.dmi.DmiProperties; import org.onap.cps.ncmp.impl.dmi.DmiRestClient; @@ -48,6 +50,7 @@ import org.springframework.stereotype.Service; /** * Operations class for DMI Model. */ +@Slf4j @RequiredArgsConstructor @Service public class DmiModelOperations { @@ -62,6 +65,8 @@ public class DmiModelOperations { * @param yangModelCmHandle the yang model cm handle * @return module references */ + @Timed(value = "cps.ncmp.inventory.module.references.from.dmi", + description = "Time taken to get all module references for a cm handle from dmi") public List<ModuleReference> getModuleReferences(final YangModelCmHandle yangModelCmHandle) { final DmiRequestBody dmiRequestBody = DmiRequestBody.builder() .moduleSetTag(yangModelCmHandle.getModuleSetTag()).build(); @@ -79,6 +84,8 @@ public class DmiModelOperations { * @param newModuleReferences the unknown module references * @return yang resources as map of module name to yang(re)source */ + @Timed(value = "cps.ncmp.inventory.yang.resources.from.dmi", + description = "Time taken to get list of yang resources from dmi") public Map<String, String> getNewYangResourcesFromDmi(final YangModelCmHandle yangModelCmHandle, final Collection<ModuleReference> newModuleReferences) { if (newModuleReferences.isEmpty()) { diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncService.java index ca0f1c6a6d..ba50dd3c19 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncService.java @@ -26,16 +26,20 @@ import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY_PARENT; import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME; +import com.hazelcast.collection.ISet; import java.time.OffsetDateTime; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import lombok.AllArgsConstructor; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.logging.log4j.util.Strings; import org.onap.cps.api.CpsAnchorService; import org.onap.cps.api.CpsDataService; import org.onap.cps.api.CpsModuleService; +import org.onap.cps.ncmp.api.exceptions.NcmpException; import org.onap.cps.ncmp.impl.inventory.models.CmHandleState; import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle; import org.onap.cps.spi.CascadeDeleteAllowed; @@ -50,12 +54,15 @@ import org.springframework.stereotype.Service; @RequiredArgsConstructor public class ModuleSyncService { + private static final Map<String, String> NO_NEW_MODULES = Collections.emptyMap(); + private final DmiModelOperations dmiModelOperations; private final CpsModuleService cpsModuleService; private final CpsDataService cpsDataService; private final CpsAnchorService cpsAnchorService; private final JsonObjectMapper jsonObjectMapper; - private static final Map<String, String> NO_NEW_MODULES = Collections.emptyMap(); + private final ISet<String> moduleSetTagsBeingProcessed; + private final Map<String, ModuleDelta> privateModuleSetCache = new HashMap<>(); @AllArgsConstructor private static final class ModuleDelta { @@ -69,11 +76,37 @@ public class ModuleSyncService { * @param yangModelCmHandle the yang model of cm handle. */ public void syncAndCreateSchemaSetAndAnchor(final YangModelCmHandle yangModelCmHandle) { - final ModuleDelta moduleDelta = getModuleDelta(yangModelCmHandle, yangModelCmHandle.getModuleSetTag()); - final String cmHandleId = yangModelCmHandle.getId(); - cpsModuleService.createSchemaSetFromModules(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, + final String moduleSetTag = yangModelCmHandle.getModuleSetTag(); + final ModuleDelta moduleDelta; + boolean isNewModuleSetTag = Strings.isNotBlank(moduleSetTag); + try { + if (privateModuleSetCache.containsKey(moduleSetTag)) { + moduleDelta = privateModuleSetCache.get(moduleSetTag); + } else { + if (isNewModuleSetTag) { + if (moduleSetTagsBeingProcessed.add(moduleSetTag)) { + log.info("Processing new module set tag {}", moduleSetTag); + } else { + isNewModuleSetTag = false; + throw new NcmpException("Concurrent processing of module set tag " + moduleSetTag, + moduleSetTag + " already being processed for cm handle " + yangModelCmHandle.getId()); + } + } + moduleDelta = getModuleDelta(yangModelCmHandle, moduleSetTag); + } + final String cmHandleId = yangModelCmHandle.getId(); + cpsModuleService.createSchemaSetFromModules(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, moduleDelta.newModuleNameToContentMap, moduleDelta.allModuleReferences); - cpsAnchorService.createAnchor(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, cmHandleId); + cpsAnchorService.createAnchor(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, cmHandleId); + if (isNewModuleSetTag) { + final ModuleDelta noModuleDelta = new ModuleDelta(moduleDelta.allModuleReferences, NO_NEW_MODULES); + privateModuleSetCache.put(moduleSetTag, noModuleDelta); + } + } finally { + if (isNewModuleSetTag) { + moduleSetTagsBeingProcessed.remove(moduleSetTag); + } + } } /** @@ -105,6 +138,10 @@ public class ModuleSyncService { } } + public void clearPrivateModuleSetCache() { + privateModuleSetCache.clear(); + } + private ModuleDelta getModuleDelta(final YangModelCmHandle yangModelCmHandle, final String targetModuleSetTag) { final Map<String, String> newYangResources; Collection<ModuleReference> allModuleReferences = getModuleReferencesByModuleSetTag(targetModuleSetTag); @@ -120,7 +157,7 @@ public class ModuleSyncService { } private Collection<ModuleReference> getModuleReferencesByModuleSetTag(final String moduleSetTag) { - if (moduleSetTag == null || moduleSetTag.trim().isEmpty()) { + if (Strings.isBlank(moduleSetTag)) { return Collections.emptyList(); } return cpsModuleService.getModuleReferencesByAttribute(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR, diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java index 31fcbad08b..7cc74a3b55 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java @@ -63,7 +63,8 @@ public class ModuleSyncTasks { try { cmHandlesAsDataNodes.forEach(cmHandleAsDataNode -> { final YangModelCmHandle yangModelCmHandle = YangDataConverter.toYangModelCmHandle(cmHandleAsDataNode); - cmHandleStatePerCmHandle.put(yangModelCmHandle, processCmHandle(yangModelCmHandle)); + final CmHandleState cmHandleState = processCmHandle(yangModelCmHandle); + cmHandleStatePerCmHandle.put(yangModelCmHandle, cmHandleState); }); } finally { batchCounter.getAndDecrement(); @@ -127,4 +128,4 @@ public class ModuleSyncTasks { log.info("{} removed from in progress map", resetCmHandleId); } } -}
\ No newline at end of file +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java index 1f33cc349d..b98075c06c 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java @@ -20,8 +20,10 @@ package org.onap.cps.ncmp.impl.inventory.sync; +import com.hazelcast.collection.ISet; import com.hazelcast.config.MapConfig; import com.hazelcast.config.QueueConfig; +import com.hazelcast.config.SetConfig; import com.hazelcast.map.IMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.locks.Lock; @@ -44,6 +46,8 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig { private static final QueueConfig commonQueueConfig = createQueueConfig("defaultQueueConfig"); private static final MapConfig moduleSyncStartedConfig = createMapConfig("moduleSyncStartedConfig"); private static final MapConfig dataSyncSemaphoresConfig = createMapConfig("dataSyncSemaphoresConfig"); + private static final SetConfig moduleSetTagsBeingProcessedConfig + = createSetConfig("moduleSetTagsBeingProcessedConfig"); private static final String LOCK_NAME_FOR_WORK_QUEUE = "workQueueLock"; /** @@ -63,8 +67,7 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig { */ @Bean public IMap<String, Object> moduleSyncStartedOnCmHandles() { - return getOrCreateHazelcastInstance(moduleSyncStartedConfig).getMap( - "moduleSyncStartedOnCmHandles"); + return getOrCreateHazelcastInstance(moduleSyncStartedConfig).getMap("moduleSyncStartedOnCmHandles"); } /** @@ -78,6 +81,17 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig { } /** + * Collection of (new) module set tags being processed. + * To prevent processing on multiple threads of same tag + * + * @return set of module set tags being processed + */ + @Bean + public ISet<String> moduleSetTagsBeingProcessed() { + return getOrCreateHazelcastInstance(moduleSetTagsBeingProcessedConfig).getSet("moduleSetTagsBeingProcessed"); + } + + /** * Retrieves a distributed lock used to control access to the work queue for module synchronization. * This lock ensures that the population and modification of the work queue are thread-safe and * protected from concurrent access across different nodes in the distributed system. diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfigSpec.groovy index 0bd838437d..c08ff75a44 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfigSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfigSpec.groovy @@ -22,12 +22,17 @@ package org.onap.cps.ncmp.impl.cache import com.hazelcast.config.Config import com.hazelcast.config.RestEndpointGroup +import com.hazelcast.core.Hazelcast import spock.lang.Specification class HazelcastCacheConfigSpec extends Specification { def objectUnderTest = new HazelcastCacheConfig() + def cleanupSpec() { + Hazelcast.getHazelcastInstanceByName('my instance config').shutdown() + } + def 'Create Hazelcast instance with a #scenario'() { given: 'a cluster name and instance config name' objectUnderTest.clusterName = 'my cluster' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncServiceSpec.groovy index 6030e5debf..2f13a9a483 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncServiceSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncServiceSpec.groovy @@ -20,14 +20,17 @@ package org.onap.cps.ncmp.impl.inventory.sync +import com.hazelcast.collection.ISet import org.onap.cps.api.CpsAnchorService import org.onap.cps.api.CpsDataService import org.onap.cps.api.CpsModuleService +import org.onap.cps.ncmp.api.exceptions.NcmpException import org.onap.cps.ncmp.api.inventory.models.CompositeStateBuilder import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle import org.onap.cps.ncmp.impl.inventory.CmHandleQueryService import org.onap.cps.ncmp.impl.inventory.models.CmHandleState import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle +import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncService.ModuleDelta import org.onap.cps.spi.CascadeDeleteAllowed import org.onap.cps.spi.exceptions.SchemaSetNotFoundException import org.onap.cps.spi.model.ModuleReference @@ -45,18 +48,22 @@ class ModuleSyncServiceSpec extends Specification { def mockCmHandleQueries = Mock(CmHandleQueryService) def mockCpsDataService = Mock(CpsDataService) def mockJsonObjectMapper = Mock(JsonObjectMapper) + def mockModuleSetTagsBeingProcessed = Mock(ISet<String>); - def objectUnderTest = new ModuleSyncService(mockDmiModelOperations, mockCpsModuleService, - mockCpsDataService, mockCpsAnchorService, mockJsonObjectMapper) + def objectUnderTest = new ModuleSyncService(mockDmiModelOperations, mockCpsModuleService, mockCpsDataService, mockCpsAnchorService, mockJsonObjectMapper, mockModuleSetTagsBeingProcessed) def expectedDataspaceName = NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME - def 'Sync model for a NEW cm handle using module set tags: #scenario.'() { - given: 'a cm handle state to be synced' - def ncmpServiceCmHandle = new NcmpServiceCmHandle() - ncmpServiceCmHandle.setCompositeState(new CompositeStateBuilder().withCmHandleState(CmHandleState.ADVISED).build()) - ncmpServiceCmHandle.cmHandleId = 'ch-1' - def yangModelCmHandle = YangModelCmHandle.toYangModelCmHandle('some service name', '', '', ncmpServiceCmHandle, moduleSetTag, '', '') + def setup() { + // Allow tags for al test except 'duplicate-processing-tag' to be added to processing semaphore + mockModuleSetTagsBeingProcessed.add('new-tag') >> true + mockModuleSetTagsBeingProcessed.add('same-tag') >> true + mockModuleSetTagsBeingProcessed.add('cached-tag') >> true + } + + def 'Sync models for a NEW cm handle using module set tags: #scenario.'() { + given: 'a cm handle to be synced' + def yangModelCmHandle = createAdvisedCmHandle(moduleSetTag) and: 'DMI operations returns some module references' def moduleReferences = [ new ModuleReference('module1','1'), new ModuleReference('module2','2') ] mockDmiModelOperations.getModuleReferences(yangModelCmHandle) >> moduleReferences @@ -75,10 +82,60 @@ class ModuleSyncServiceSpec extends Specification { where: 'the following parameters are used' scenario | identifiedNewModuleReferences | newModuleNameContentToMap | moduleSetTag | existingModuleReferences 'one new module, new tag' | [new ModuleReference('module1', '1')] | [module1: 'some yang source'] | '' | [] - 'no new module, new tag' | [] | [:] | 'new-tag-1' | [] + 'no new module, new tag' | [] | [:] | 'new-tag' | [] 'same tag' | [] | [:] | 'same-tag' | [new ModuleReference('module1', '1'), new ModuleReference('module2', '2')] } + def 'Attempt Sync models for a cm handle with exception and #scenario module set tag'() { + given: 'a cm handle to be synced' + def yangModelCmHandle = createAdvisedCmHandle(moduleSetTag) + and: 'the service returns a list of module references when queried with the specified attributes' + mockCpsModuleService.getModuleReferencesByAttribute(*_) >> [new ModuleReference('module1', '1')] + and: 'exception occurs when try to store result' + def testException = new RuntimeException('test') + mockCpsModuleService.createSchemaSetFromModules(*_) >> { throw testException } + when: 'module sync is triggered' + objectUnderTest.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle) + then: 'the same exception is thrown up' + def exceptionThrown = thrown(Exception) + assert testException == exceptionThrown + and: 'module set tag is removed from processing semaphores only when needed' + expectedCallsToRemoveTag * mockModuleSetTagsBeingProcessed.remove('new-tag') + where: 'following module set tags are used' + scenario | moduleSetTag || expectedCallsToRemoveTag + 'with' | 'new-tag' || 1 + 'without' | ' ' || 0 + } + + def 'Sync models for a cm handle with previously cached module set tag.'() { + given: 'a cm handle to be synced' + def yangModelCmHandle = createAdvisedCmHandle('cached-tag') + and: 'The module set tag exist in the private cache' + def moduleReferences = [ new ModuleReference('module1','1') ] + def cachedModuleDelta = new ModuleDelta(moduleReferences, [:]) + objectUnderTest.privateModuleSetCache.put('cached-tag', cachedModuleDelta) + when: 'module sync is triggered' + objectUnderTest.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle) + then: 'create schema set from module is invoked with correct parameters' + 1 * mockCpsModuleService.createSchemaSetFromModules(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, 'ch-1', [:], moduleReferences) + and: 'anchor is created with the correct parameters' + 1 * mockCpsAnchorService.createAnchor(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, 'ch-1', 'ch-1') + } + + def 'Attempt to sync using a module set tag already being processed by a different instance or thread.'() { + given: 'a cm handle to be synced' + def yangModelCmHandle = createAdvisedCmHandle('duplicateTag') + and: 'The module set tag already exist in the processing semaphore set' + mockModuleSetTagsBeingProcessed.add('duplicate-processing-tag') > false + when: 'module sync is triggered' + objectUnderTest.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle) + then: 'a ncmp exception is thrown with the relevant details' + def exceptionThrown = thrown(NcmpException) + assert exceptionThrown.message.contains('duplicateTag') + assert exceptionThrown.details.contains('duplicateTag') + assert exceptionThrown.details.contains('ch-1') + } + def 'Upgrade model for an existing cm handle with Module Set Tag where the modules are #scenario'() { given: 'a cm handle being upgraded to module set tag: tag-1' def ncmpServiceCmHandle = new NcmpServiceCmHandle() @@ -113,7 +170,7 @@ class ModuleSyncServiceSpec extends Specification { 'in database' | [new ModuleReference('module1', '1')] } - def 'upgrade model for a existing cm handle'() { + def 'upgrade model for an existing cm handle'() { given: 'a cm handle that is ready but locked for upgrade' def ncmpServiceCmHandle = new NcmpServiceCmHandle() ncmpServiceCmHandle.setCompositeState(new CompositeStateBuilder() @@ -159,4 +216,20 @@ class ModuleSyncServiceSpec extends Specification { result == unsupportedOperationException } + def 'Clear module set cache.'() { + given: 'something in the module set cache' + objectUnderTest.privateModuleSetCache.put('test',new ModuleDelta([],[:])) + when: 'the cache is cleared' + objectUnderTest.clearPrivateModuleSetCache() + then: 'the cache is empty' + objectUnderTest.privateModuleSetCache.isEmpty() + } + + def createAdvisedCmHandle(moduleSetTag) { + def ncmpServiceCmHandle = new NcmpServiceCmHandle() + ncmpServiceCmHandle.setCompositeState(new CompositeStateBuilder().withCmHandleState(CmHandleState.ADVISED).build()) + ncmpServiceCmHandle.cmHandleId = 'ch-1' + return YangModelCmHandle.toYangModelCmHandle('some service name', '', '', ncmpServiceCmHandle, moduleSetTag, '', '') + } + } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy index 8ce1e934f2..e21c868bbf 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy @@ -26,6 +26,7 @@ import ch.qos.logback.classic.Logger import ch.qos.logback.classic.spi.ILoggingEvent import ch.qos.logback.core.read.ListAppender import com.hazelcast.config.Config +import com.hazelcast.core.Hazelcast import com.hazelcast.instance.impl.HazelcastInstanceFactory import com.hazelcast.map.IMap import org.onap.cps.ncmp.api.inventory.models.CompositeState @@ -75,6 +76,10 @@ class ModuleSyncTasksSpec extends Specification { def objectUnderTest = new ModuleSyncTasks(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService, mockLcmEventsCmHandleStateHandler, moduleSyncStartedOnCmHandles) + def cleanupSpec() { + Hazelcast.getHazelcastInstanceByName('hazelcastInstanceName').shutdown() + } + def 'Module Sync ADVISED cm handles.'() { given: 'cm handles in an ADVISED state' def cmHandle1 = cmHandleAsDataNodeByIdAndState('cm-handle-1', CmHandleState.ADVISED) diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfigSpec.groovy index 4c96d6b822..c2ecf927c8 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfigSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfigSpec.groovy @@ -20,6 +20,7 @@ package org.onap.cps.ncmp.impl.inventory.sync +import com.hazelcast.collection.ISet import com.hazelcast.config.Config import com.hazelcast.core.Hazelcast import com.hazelcast.map.IMap @@ -38,13 +39,16 @@ import java.util.concurrent.TimeUnit class SynchronizationCacheConfigSpec extends Specification { @Autowired - private BlockingQueue<DataNode> moduleSyncWorkQueue + BlockingQueue<DataNode> moduleSyncWorkQueue @Autowired - private IMap<String, Object> moduleSyncStartedOnCmHandles + IMap<String, Object> moduleSyncStartedOnCmHandles @Autowired - private IMap<String, Boolean> dataSyncSemaphores + IMap<String, Boolean> dataSyncSemaphores + + @Autowired + ISet<String> moduleSetTagsBeingProcessed def cleanupSpec() { Hazelcast.getHazelcastInstanceByName('cps-and-ncmp-hazelcast-instance-test-config').shutdown() @@ -57,8 +61,11 @@ class SynchronizationCacheConfigSpec extends Specification { assert null != moduleSyncStartedOnCmHandles and: 'system is able to create an instance of a map to hold data sync semaphores' assert null != dataSyncSemaphores - and: 'they have the correct names (in any order)' - assert Hazelcast.allHazelcastInstances.name.contains('cps-and-ncmp-hazelcast-instance-test-config') + and: 'system is able to create an instance of a set to hold module set tags being processed' + assert null != moduleSetTagsBeingProcessed + and: 'there is only one instance with the correct name' + assert Hazelcast.allHazelcastInstances.size() == 1 + assert Hazelcast.allHazelcastInstances.name[0] == 'cps-and-ncmp-hazelcast-instance-test-config' } def 'Verify configs for Distributed objects'(){ @@ -103,7 +110,6 @@ class SynchronizationCacheConfigSpec extends Specification { then: 'applied properties are reflected' assert testConfig.networkConfig.join.kubernetesConfig.enabled assert testConfig.networkConfig.join.kubernetesConfig.properties.get('service-name') == 'test-service-name' - } def 'Time to Live Verify for Module Sync Semaphore'() { diff --git a/cps-ri/src/main/java/org/onap/cps/ri/CpsModulePersistenceServiceImpl.java b/cps-ri/src/main/java/org/onap/cps/ri/CpsModulePersistenceServiceImpl.java index 6f491ba3b7..3368aee148 100755 --- a/cps-ri/src/main/java/org/onap/cps/ri/CpsModulePersistenceServiceImpl.java +++ b/cps-ri/src/main/java/org/onap/cps/ri/CpsModulePersistenceServiceImpl.java @@ -27,6 +27,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableSet; +import io.micrometer.core.annotation.Timed; import jakarta.transaction.Transactional; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -186,6 +187,8 @@ public class CpsModulePersistenceServiceImpl implements CpsModulePersistenceServ // can occur in case of specific concurrent requests. @Retryable(retryFor = DuplicatedYangResourceException.class, maxAttempts = 5, backoff = @Backoff(random = true, delay = 200, maxDelay = 2000, multiplier = 2)) + @Timed(value = "cps.module.persistence.schemaset.store", + description = "Time taken to store a schemaset (list of module references") public void storeSchemaSetFromModules(final String dataspaceName, final String schemaSetName, final Map<String, String> newModuleNameToContentMap, final Collection<ModuleReference> allModuleReferences) { diff --git a/cps-service/src/main/java/org/onap/cps/api/impl/CpsModuleServiceImpl.java b/cps-service/src/main/java/org/onap/cps/api/impl/CpsModuleServiceImpl.java index a600b22b61..4063a7f769 100644 --- a/cps-service/src/main/java/org/onap/cps/api/impl/CpsModuleServiceImpl.java +++ b/cps-service/src/main/java/org/onap/cps/api/impl/CpsModuleServiceImpl.java @@ -171,8 +171,8 @@ public class CpsModuleServiceImpl implements CpsModuleService { return cpsModulePersistenceService.identifyNewModuleReferences(moduleReferencesToCheck); } - @Timed(value = "cps.module.service.module.reference.query", - description = "Time taken to query list of module references") + @Timed(value = "cps.module.service.module.reference.query.by.attribute", + description = "Time taken to query list of module references by attribute (e.g moduleSetTag)") @Override public Collection<ModuleReference> getModuleReferencesByAttribute(final String dataspaceName, final String anchorName, diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy index 759eccd966..02a10cfa6b 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy @@ -21,6 +21,7 @@ package org.onap.cps.integration.base +import com.hazelcast.collection.ISet import okhttp3.mockwebserver.MockWebServer import org.onap.cps.api.CpsAnchorService import org.onap.cps.api.CpsDataService @@ -37,13 +38,13 @@ import org.onap.cps.ncmp.impl.data.NetworkCmProxyQueryService import org.onap.cps.ncmp.impl.inventory.InventoryPersistence import org.onap.cps.ncmp.impl.inventory.ParameterizedCmHandleQueryService import org.onap.cps.ncmp.impl.inventory.models.CmHandleState +import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncService import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncWatchdog import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher import org.onap.cps.ri.repository.DataspaceRepository import org.onap.cps.ri.utils.SessionManager import org.onap.cps.spi.exceptions.DataspaceNotFoundException import org.onap.cps.spi.model.DataNode -import org.onap.cps.utils.ContentType import org.onap.cps.utils.JsonObjectMapper import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Value @@ -61,13 +62,8 @@ import spock.lang.Specification import spock.util.concurrent.PollingConditions import java.time.OffsetDateTime -import java.time.format.DateTimeFormatter import java.util.concurrent.BlockingQueue -import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DATASPACE_NAME -import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY_ANCHOR -import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY_PARENT - @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK, classes = [CpsDataspaceService]) @Testcontainers @EnableAutoConfiguration @@ -121,6 +117,9 @@ abstract class CpsIntegrationSpecBase extends Specification { ModuleSyncWatchdog moduleSyncWatchdog @Autowired + ModuleSyncService moduleSyncService + + @Autowired BlockingQueue<DataNode> moduleSyncWorkQueue @Autowired @@ -132,6 +131,8 @@ abstract class CpsIntegrationSpecBase extends Specification { @Autowired AlternateIdMatcher alternateIdMatcher + @Autowired + ISet<String> moduleSetTagsBeingProcessed @Value('${ncmp.policy-executor.server.port:8080}') private String policyServerPort; @@ -174,13 +175,13 @@ abstract class CpsIntegrationSpecBase extends Specification { DMI1_URL = String.format("http://%s:%s", mockDmiServer1.getHostName(), mockDmiServer1.getPort()) DMI2_URL = String.format("http://%s:%s", mockDmiServer2.getHostName(), mockDmiServer2.getPort()) - } def cleanup() { mockDmiServer1.shutdown() mockDmiServer2.shutdown() mockPolicyServer.shutdown() + moduleSetTagsBeingProcessed.clear() } def static readResourceDataFile(filename) { @@ -262,11 +263,16 @@ abstract class CpsIntegrationSpecBase extends Specification { networkCmProxyInventoryFacade.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: dmiPlugin, createdCmHandles: [cmHandleToCreate])) } - def registerSequenceOfCmHandlesWithoutWaitForReady(dmiPlugin, moduleSetTag, numberOfCmHandles) { + def registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(dmiPlugin, moduleSetTag, numberOfCmHandles, offset) { def cmHandles = [] + def id = offset + def moduleReferences = (1..200).collect { moduleSetTag + '_Module_' + it.toString() } (1..numberOfCmHandles).each { - def cmHandle = new NcmpServiceCmHandle(cmHandleId: 'ch-'+it, moduleSetTag: moduleSetTag, alternateId: NO_ALTERNATE_ID) - cmHandles.add(cmHandle) + def ncmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: 'ch-'+id, moduleSetTag: moduleSetTag, alternateId: NO_ALTERNATE_ID) + cmHandles.add(ncmpServiceCmHandle) + dmiDispatcher1.moduleNamesPerCmHandleId[ncmpServiceCmHandle.cmHandleId] = moduleReferences + dmiDispatcher2.moduleNamesPerCmHandleId[ncmpServiceCmHandle.cmHandleId] = moduleReferences + id++ } networkCmProxyInventoryFacade.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: dmiPlugin, createdCmHandles: cmHandles)) } @@ -279,9 +285,10 @@ abstract class CpsIntegrationSpecBase extends Specification { networkCmProxyInventoryFacade.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: dmiPlugin, removedCmHandles: cmHandleIds)) } - def deregisterSequenceOfCmHandles(dmiPlugin, numberOfCmHandles) { + def deregisterSequenceOfCmHandles(dmiPlugin, numberOfCmHandles, offset) { def cmHandleIds = [] - (1..numberOfCmHandles).each { cmHandleIds.add('ch-'+it) } + def id = offset + (1..numberOfCmHandles).each { cmHandleIds.add('ch-' + id++) } networkCmProxyInventoryFacade.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: dmiPlugin, removedCmHandles: cmHandleIds)) } diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleUpgradeSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleUpgradeSpec.groovy index 64449371fe..a5e3daf289 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleUpgradeSpec.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleUpgradeSpec.groovy @@ -33,54 +33,55 @@ class CmHandleUpgradeSpec extends CpsIntegrationSpecBase { NetworkCmProxyInventoryFacade objectUnderTest - static final CM_HANDLE_ID = 'ch-1' - static final CM_HANDLE_ID_WITH_EXISTING_MODULE_SET_TAG = 'ch-2' + def cmHandleId = 'ch-1' + def cmHandleIdWithExistingModuleSetTag = 'ch-2' def setup() { objectUnderTest = networkCmProxyInventoryFacade + moduleSyncService.clearPrivateModuleSetCache() } def 'Upgrade CM-handle with new moduleSetTag or no moduleSetTag.'() { given: 'a CM-handle is created with expected initial modules: M1 and M2' - dmiDispatcher1.moduleNamesPerCmHandleId[CM_HANDLE_ID] = ['M1', 'M2'] - registerCmHandle(DMI1_URL, CM_HANDLE_ID, initialModuleSetTag) - assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(CM_HANDLE_ID).moduleName.sort() + dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleId] = ['M1', 'M2'] + registerCmHandle(DMI1_URL, cmHandleId, initialModuleSetTag) + assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort() when: "the CM-handle is upgraded with given moduleSetTag '${updatedModuleSetTag}'" - def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [CM_HANDLE_ID], moduleSetTag: updatedModuleSetTag) + def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [cmHandleId], moduleSetTag: updatedModuleSetTag) def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration( new DmiPluginRegistration(dmiPlugin: DMI1_URL, upgradedCmHandles: cmHandlesToUpgrade)) then: 'registration gives successful response' - assert dmiPluginRegistrationResponse.upgradedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(CM_HANDLE_ID)] + assert dmiPluginRegistrationResponse.upgradedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)] and: 'CM-handle is in LOCKED state due to MODULE_UPGRADE' - def cmHandleCompositeState = objectUnderTest.getCmHandleCompositeState(CM_HANDLE_ID) + def cmHandleCompositeState = objectUnderTest.getCmHandleCompositeState(cmHandleId) assert cmHandleCompositeState.cmHandleState == CmHandleState.LOCKED assert cmHandleCompositeState.lockReason.lockReasonCategory == LockReasonCategory.MODULE_UPGRADE assert cmHandleCompositeState.lockReason.details == "Upgrade to ModuleSetTag: ${updatedModuleSetTag}" when: 'DMI will return different modules for upgrade: M1 and M3' - dmiDispatcher1.moduleNamesPerCmHandleId[CM_HANDLE_ID] = ['M1', 'M3'] + dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleId] = ['M1', 'M3'] and: 'the module sync watchdog is triggered twice' 2.times { moduleSyncWatchdog.moduleSyncAdvisedCmHandles() } then: 'CM-handle goes to READY state' new PollingConditions().within(MODULE_SYNC_WAIT_TIME_IN_SECONDS, () -> { - assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(CM_HANDLE_ID).cmHandleState + assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(cmHandleId).cmHandleState }) and: 'the CM-handle has expected moduleSetTag' - assert objectUnderTest.getNcmpServiceCmHandle(CM_HANDLE_ID).moduleSetTag == updatedModuleSetTag + assert objectUnderTest.getNcmpServiceCmHandle(cmHandleId).moduleSetTag == updatedModuleSetTag and: 'CM-handle has expected updated modules: M1 and M3' - assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences(CM_HANDLE_ID).moduleName.sort() + assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort() cleanup: 'deregister CM-handle' - deregisterCmHandle(DMI1_URL, CM_HANDLE_ID) + deregisterCmHandle(DMI1_URL, cmHandleId) - where: + where: 'following module set tags are used' initialModuleSetTag | updatedModuleSetTag NO_MODULE_SET_TAG | NO_MODULE_SET_TAG NO_MODULE_SET_TAG | 'new' @@ -90,39 +91,39 @@ class CmHandleUpgradeSpec extends CpsIntegrationSpecBase { def 'Upgrade CM-handle with existing moduleSetTag.'() { given: 'DMI will return modules for registration' - dmiDispatcher1.moduleNamesPerCmHandleId[CM_HANDLE_ID] = ['M1', 'M2'] - dmiDispatcher1.moduleNamesPerCmHandleId[CM_HANDLE_ID_WITH_EXISTING_MODULE_SET_TAG] = ['M1', 'M3'] + dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleId] = ['M1', 'M2'] + dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleIdWithExistingModuleSetTag] = ['M1', 'M3'] and: "an existing CM-handle handle with moduleSetTag '${updatedModuleSetTag}'" - registerCmHandle(DMI1_URL, CM_HANDLE_ID_WITH_EXISTING_MODULE_SET_TAG, updatedModuleSetTag) - assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences(CM_HANDLE_ID_WITH_EXISTING_MODULE_SET_TAG).moduleName.sort() + registerCmHandle(DMI1_URL, cmHandleIdWithExistingModuleSetTag, updatedModuleSetTag) + assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences(cmHandleIdWithExistingModuleSetTag).moduleName.sort() and: "a CM-handle with moduleSetTag '${initialModuleSetTag}' which will be upgraded" - registerCmHandle(DMI1_URL, CM_HANDLE_ID, initialModuleSetTag) - assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(CM_HANDLE_ID).moduleName.sort() + registerCmHandle(DMI1_URL, cmHandleId, initialModuleSetTag) + assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort() when: "CM-handle is upgraded to moduleSetTag '${updatedModuleSetTag}'" - def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [CM_HANDLE_ID], moduleSetTag: updatedModuleSetTag) + def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [cmHandleId], moduleSetTag: updatedModuleSetTag) def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration( new DmiPluginRegistration(dmiPlugin: DMI1_URL, upgradedCmHandles: cmHandlesToUpgrade)) then: 'registration gives successful response' - assert dmiPluginRegistrationResponse.upgradedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(CM_HANDLE_ID)] + assert dmiPluginRegistrationResponse.upgradedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)] and: 'the module sync watchdog is triggered twice' 2.times { moduleSyncWatchdog.moduleSyncAdvisedCmHandles() } and: 'CM-handle goes to READY state' new PollingConditions().within(MODULE_SYNC_WAIT_TIME_IN_SECONDS, () -> { - assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(CM_HANDLE_ID).cmHandleState + assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(cmHandleId).cmHandleState }) and: 'the CM-handle has expected moduleSetTag' - assert objectUnderTest.getNcmpServiceCmHandle(CM_HANDLE_ID).moduleSetTag == updatedModuleSetTag + assert objectUnderTest.getNcmpServiceCmHandle(cmHandleId).moduleSetTag == updatedModuleSetTag and: 'CM-handle has expected updated modules: M1 and M3' - assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences(CM_HANDLE_ID).moduleName.sort() + assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort() cleanup: 'deregister CM-handle' - deregisterCmHandles(DMI1_URL, [CM_HANDLE_ID, CM_HANDLE_ID_WITH_EXISTING_MODULE_SET_TAG]) + deregisterCmHandles(DMI1_URL, [cmHandleId, cmHandleIdWithExistingModuleSetTag]) where: initialModuleSetTag | updatedModuleSetTag @@ -132,37 +133,37 @@ class CmHandleUpgradeSpec extends CpsIntegrationSpecBase { def 'Skip upgrade of CM-handle with same moduleSetTag as before.'() { given: 'an existing CM-handle with expected initial modules: M1 and M2' - dmiDispatcher1.moduleNamesPerCmHandleId[CM_HANDLE_ID] = ['M1', 'M2'] - registerCmHandle(DMI1_URL, CM_HANDLE_ID, 'same') - assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(CM_HANDLE_ID).moduleName.sort() + dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleId] = ['M1', 'M2'] + registerCmHandle(DMI1_URL, cmHandleId, 'same') + assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort() when: 'CM-handle is upgraded with the same moduleSetTag' - def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [CM_HANDLE_ID], moduleSetTag: 'same') + def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [cmHandleId], moduleSetTag: 'same') objectUnderTest.updateDmiRegistration( new DmiPluginRegistration(dmiPlugin: DMI1_URL, upgradedCmHandles: cmHandlesToUpgrade)) then: 'CM-handle remains in READY state' - assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(CM_HANDLE_ID).cmHandleState + assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(cmHandleId).cmHandleState and: 'the CM-handle has same moduleSetTag as before' - assert objectUnderTest.getNcmpServiceCmHandle(CM_HANDLE_ID).moduleSetTag == 'same' + assert objectUnderTest.getNcmpServiceCmHandle(cmHandleId).moduleSetTag == 'same' then: 'CM-handle has same modules as before: M1 and M2' - assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(CM_HANDLE_ID).moduleName.sort() + assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort() cleanup: 'deregister CM-handle' - deregisterCmHandle(DMI1_URL, CM_HANDLE_ID) + deregisterCmHandle(DMI1_URL, cmHandleId) } def 'Upgrade of CM-handle fails due to DMI error.'() { given: 'a CM-handle exists' - dmiDispatcher1.moduleNamesPerCmHandleId[CM_HANDLE_ID] = ['M1', 'M2'] - registerCmHandle(DMI1_URL, CM_HANDLE_ID, 'oldTag') + dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleId] = ['M1', 'M2'] + registerCmHandle(DMI1_URL, cmHandleId, 'oldTag') and: 'DMI is not available for upgrade' dmiDispatcher1.isAvailable = false when: 'the CM-handle is upgraded' - def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [CM_HANDLE_ID], moduleSetTag: 'newTag') + def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [cmHandleId], moduleSetTag: 'newTag') objectUnderTest.updateDmiRegistration( new DmiPluginRegistration(dmiPlugin: DMI1_URL, upgradedCmHandles: cmHandlesToUpgrade)) @@ -171,16 +172,16 @@ class CmHandleUpgradeSpec extends CpsIntegrationSpecBase { then: 'CM-handle goes to LOCKED state with reason MODULE_UPGRADE_FAILED' new PollingConditions().within(MODULE_SYNC_WAIT_TIME_IN_SECONDS, () -> { - def cmHandleCompositeState = objectUnderTest.getCmHandleCompositeState(CM_HANDLE_ID) + def cmHandleCompositeState = objectUnderTest.getCmHandleCompositeState(cmHandleId) assert cmHandleCompositeState.cmHandleState == CmHandleState.LOCKED assert cmHandleCompositeState.lockReason.lockReasonCategory == LockReasonCategory.MODULE_UPGRADE_FAILED }) and: 'the CM-handle has same moduleSetTag as before' - assert objectUnderTest.getNcmpServiceCmHandle(CM_HANDLE_ID).moduleSetTag == 'oldTag' + assert objectUnderTest.getNcmpServiceCmHandle(cmHandleId).moduleSetTag == 'oldTag' cleanup: 'deregister CM-handle' - deregisterCmHandle(DMI1_URL, CM_HANDLE_ID) + deregisterCmHandle(DMI1_URL, cmHandleId) } } diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy index e0bb437a7c..963bc1fe61 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy @@ -20,26 +20,32 @@ package org.onap.cps.integration.functional.ncmp +import io.micrometer.core.instrument.MeterRegistry import org.onap.cps.integration.base.CpsIntegrationSpecBase import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncWatchdog +import org.springframework.beans.factory.annotation.Autowired +import spock.util.concurrent.PollingConditions import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase { ModuleSyncWatchdog objectUnderTest + @Autowired + MeterRegistry meterRegistry + def executorService = Executors.newFixedThreadPool(2) - def SYNC_SAMPLE_SIZE = 100 + def PARALLEL_SYNC_SAMPLE_SIZE = 100 def setup() { objectUnderTest = moduleSyncWatchdog - registerSequenceOfCmHandlesWithoutWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, SYNC_SAMPLE_SIZE) } def cleanup() { try { - deregisterSequenceOfCmHandles(DMI1_URL, SYNC_SAMPLE_SIZE) + deregisterSequenceOfCmHandles(DMI1_URL, PARALLEL_SYNC_SAMPLE_SIZE, 1) moduleSyncWorkQueue.clear() } finally { executorService.shutdownNow() @@ -47,15 +53,60 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase { } def 'Watchdog is disabled for test.'() { + given: + registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1) when: 'wait a while but less then the initial delay of 10 minutes' Thread.sleep(3000) then: 'the work queue remains empty' assert moduleSyncWorkQueue.isEmpty() } + def 'CPS-2478 Highlight module sync inefficiencies.'() { + given: 'register 250 cm handles with module set tag cps-2478-A' + def numberOfTags = 2 + def cmHandlesPerTag = 250 + def totalCmHandles = numberOfTags * cmHandlesPerTag + def offset = 1 + registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'cps-2478-A', cmHandlesPerTag, offset) + and: 'register anther 250 cm handles with module set tag cps-2478-B' + offset += cmHandlesPerTag + registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'cps-2478-B', cmHandlesPerTag, offset) + and: 'clear any previous instrumentation' + meterRegistry.clear() + when: 'sync all advised cm handles' + objectUnderTest.moduleSyncAdvisedCmHandles() + Thread.sleep(100) + then: 'retry until all schema sets are stored in db (1 schema set for each cm handle)' + def dbSchemaSetStorageTimer = meterRegistry.get('cps.module.persistence.schemaset.store').timer() + new PollingConditions().within(10, () -> { + objectUnderTest.moduleSyncAdvisedCmHandles() + Thread.sleep(100) + assert dbSchemaSetStorageTimer.count() >= 500 + }) + then: 'wait till at least 5 batches of state updates are done (often more because of retries of locked cm handles)' + def dbStateUpdateTimer = meterRegistry.get('cps.ncmp.cmhandle.state.update.batch').timer() + new PollingConditions().within(10, () -> { + assert dbStateUpdateTimer.count() >= 5 + }) + and: 'the db has been queried for tags exactly 2 times.' + def dbModuleQueriesTimer = meterRegistry.get('cps.module.service.module.reference.query.by.attribute').timer() + assert dbModuleQueriesTimer.count() == 2 + and: 'exactly 2 calls to DMI to get module references' + def dmiModuleRetrievalTimer = meterRegistry.get('cps.ncmp.inventory.module.references.from.dmi').timer() + assert dmiModuleRetrievalTimer.count() == 2 + and: 'log the relevant instrumentation' + logInstrumentation(dbModuleQueriesTimer, 'query module references') + logInstrumentation(dmiModuleRetrievalTimer, 'get modules from DMI ') + logInstrumentation(dbSchemaSetStorageTimer, 'store schema sets ') + logInstrumentation(dbStateUpdateTimer, 'batch state updates ') + cleanup: 'remove all cm handles' + deregisterSequenceOfCmHandles(DMI1_URL, totalCmHandles, 1) + } + def 'Populate module sync work queue simultaneously on two parallel threads (CPS-2403).'() { // This test failed before bug https://lf-onap.atlassian.net/browse/CPS-2403 was fixed given: 'the queue is empty at the start' + registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1) assert moduleSyncWorkQueue.isEmpty() when: 'attempt to populate the queue on the main (test) and another parallel thread at the same time' objectUnderTest.populateWorkQueueIfNeeded() @@ -63,12 +114,13 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase { and: 'wait a little (to give all threads time to complete their task)' Thread.sleep(50) then: 'the queue size is exactly the sample size' - assert moduleSyncWorkQueue.size() == SYNC_SAMPLE_SIZE + assert moduleSyncWorkQueue.size() == PARALLEL_SYNC_SAMPLE_SIZE } def 'Populate module sync work queue on two parallel threads with a slight difference in start time.'() { // This test proved that the issue in CPS-2403 did not arise if the the queue was populated and given time to be distributed given: 'the queue is empty at the start' + registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1) assert moduleSyncWorkQueue.isEmpty() when: 'attempt to populate the queue on the main (test) and another parallel thread a little later' objectUnderTest.populateWorkQueueIfNeeded() @@ -76,7 +128,12 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase { and: 'wait a little (to give all threads time to complete their task)' Thread.sleep(50) then: 'the queue size is exactly the sample size' - assert moduleSyncWorkQueue.size() == SYNC_SAMPLE_SIZE + assert moduleSyncWorkQueue.size() == PARALLEL_SYNC_SAMPLE_SIZE + } + + def logInstrumentation(timer, description) { + System.out.println('*** CPS-2478, ' + description + ' : ' + timer.count()+ ' times, total ' + timer.totalTime(TimeUnit.MILLISECONDS) + ' ms') + return true } def populateQueueWithoutDelay = () -> { diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/PolicyExecutorIntegrationSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/PolicyExecutorIntegrationSpec.groovy index 56d4bfaee4..f897393a53 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/PolicyExecutorIntegrationSpec.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/PolicyExecutorIntegrationSpec.groovy @@ -43,9 +43,7 @@ class PolicyExecutorIntegrationSpec extends CpsIntegrationSpecBase { } def cleanup() { - deregisterCmHandle(DMI1_URL, 'ch-1') - deregisterCmHandle(DMI1_URL, 'ch-2') - deregisterCmHandle(DMI1_URL, 'ch-3') + deregisterSequenceOfCmHandles(DMI1_URL, 3, 1) } def 'Policy Executor create request with #scenario.'() { diff --git a/integration-test/src/test/java/org/onap/cps/integration/MicroMeterTestConfig.java b/integration-test/src/test/java/org/onap/cps/integration/MicroMeterTestConfig.java new file mode 100644 index 0000000000..3b26f42c8a --- /dev/null +++ b/integration-test/src/test/java/org/onap/cps/integration/MicroMeterTestConfig.java @@ -0,0 +1,41 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the 'License'); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an 'AS IS' BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.integration; + +import io.micrometer.core.aop.TimedAspect; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class MicroMeterTestConfig { + @Bean + public MeterRegistry meterRegistry() { + return new SimpleMeterRegistry(); // Use a simple in-memory registry for testing + } + + @Bean + public TimedAspect timedAspect(final MeterRegistry meterRegistry) { + return new TimedAspect(meterRegistry); + } +} + diff --git a/integration-test/src/test/resources/application.yml b/integration-test/src/test/resources/application.yml index b786a3d4c5..30598dfb90 100644 --- a/integration-test/src/test/resources/application.yml +++ b/integration-test/src/test/resources/application.yml @@ -191,7 +191,7 @@ ncmp: modules-sync-watchdog: async-executor: - parallelism-level: 1 + parallelism-level: 2 model-loader: maximum-attempt-count: 20 |