diff options
author | ToineSiebelink <toine.siebelink@est.tech> | 2024-11-05 12:04:03 +0000 |
---|---|---|
committer | ToineSiebelink <toine.siebelink@est.tech> | 2024-11-18 09:27:07 +0000 |
commit | a0d4bc39ec35534688047772797f42a38780bc29 (patch) | |
tree | 886a639b46efdb3f4b7c8a21412770f8a664e124 /cps-ncmp-service/src/main | |
parent | 37962e3faca4f2306546c4f70d480b0c323d2c68 (diff) |
Test to highlight ModuleSetTag Inefficiencies
- Add (micrometer) instrumentation to expose inefficiencies
- Add test config for micrometer
- Add setup methods in base to create many cm handles
- Set module sync parallelism to 2 for testing
- Add clean up methods for hazelcast related tests
- added test to show inefficiencies
- POC 1 use hazelcast set to prevent multiple threads working on same ModuleSetTag
- POC 2 'cache' module set tags per thread to prevent DB looks ups
- Main inefficiency left: create schemaset for EACH cm Handled even if same tag. No easy PoC...
Change-Id: Idf46b44c475a24727dd7084bb613459f4c29be55
Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
Diffstat (limited to 'cps-ncmp-service/src/main')
5 files changed, 70 insertions, 10 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java index 109a541cb3..345eefec2a 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java @@ -51,6 +51,7 @@ public class HazelcastCacheConfig { protected HazelcastInstance getOrCreateHazelcastInstance(final NamedConfig namedConfig) { return Hazelcast.getOrCreateHazelcastInstance(defineInstanceConfig(instanceConfigName, namedConfig)); + } private Config defineInstanceConfig(final String instanceConfigName, final NamedConfig namedConfig) { diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/DmiModelOperations.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/DmiModelOperations.java index 8ba70b3a31..a056efd6ce 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/DmiModelOperations.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/DmiModelOperations.java @@ -26,6 +26,7 @@ import static org.onap.cps.ncmp.impl.models.RequiredDmiService.MODEL; import com.google.gson.JsonArray; import com.google.gson.JsonObject; +import io.micrometer.core.annotation.Timed; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -33,6 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.api.inventory.models.YangResource; import org.onap.cps.ncmp.impl.dmi.DmiProperties; import org.onap.cps.ncmp.impl.dmi.DmiRestClient; @@ -48,6 +50,7 @@ import org.springframework.stereotype.Service; /** * Operations class for DMI Model. */ +@Slf4j @RequiredArgsConstructor @Service public class DmiModelOperations { @@ -62,6 +65,8 @@ public class DmiModelOperations { * @param yangModelCmHandle the yang model cm handle * @return module references */ + @Timed(value = "cps.ncmp.inventory.module.references.from.dmi", + description = "Time taken to get all module references for a cm handle from dmi") public List<ModuleReference> getModuleReferences(final YangModelCmHandle yangModelCmHandle) { final DmiRequestBody dmiRequestBody = DmiRequestBody.builder() .moduleSetTag(yangModelCmHandle.getModuleSetTag()).build(); @@ -79,6 +84,8 @@ public class DmiModelOperations { * @param newModuleReferences the unknown module references * @return yang resources as map of module name to yang(re)source */ + @Timed(value = "cps.ncmp.inventory.yang.resources.from.dmi", + description = "Time taken to get list of yang resources from dmi") public Map<String, String> getNewYangResourcesFromDmi(final YangModelCmHandle yangModelCmHandle, final Collection<ModuleReference> newModuleReferences) { if (newModuleReferences.isEmpty()) { diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncService.java index ca0f1c6a6d..ba50dd3c19 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncService.java @@ -26,16 +26,20 @@ import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY_PARENT; import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME; +import com.hazelcast.collection.ISet; import java.time.OffsetDateTime; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import lombok.AllArgsConstructor; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.logging.log4j.util.Strings; import org.onap.cps.api.CpsAnchorService; import org.onap.cps.api.CpsDataService; import org.onap.cps.api.CpsModuleService; +import org.onap.cps.ncmp.api.exceptions.NcmpException; import org.onap.cps.ncmp.impl.inventory.models.CmHandleState; import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle; import org.onap.cps.spi.CascadeDeleteAllowed; @@ -50,12 +54,15 @@ import org.springframework.stereotype.Service; @RequiredArgsConstructor public class ModuleSyncService { + private static final Map<String, String> NO_NEW_MODULES = Collections.emptyMap(); + private final DmiModelOperations dmiModelOperations; private final CpsModuleService cpsModuleService; private final CpsDataService cpsDataService; private final CpsAnchorService cpsAnchorService; private final JsonObjectMapper jsonObjectMapper; - private static final Map<String, String> NO_NEW_MODULES = Collections.emptyMap(); + private final ISet<String> moduleSetTagsBeingProcessed; + private final Map<String, ModuleDelta> privateModuleSetCache = new HashMap<>(); @AllArgsConstructor private static final class ModuleDelta { @@ -69,11 +76,37 @@ public class ModuleSyncService { * @param yangModelCmHandle the yang model of cm handle. */ public void syncAndCreateSchemaSetAndAnchor(final YangModelCmHandle yangModelCmHandle) { - final ModuleDelta moduleDelta = getModuleDelta(yangModelCmHandle, yangModelCmHandle.getModuleSetTag()); - final String cmHandleId = yangModelCmHandle.getId(); - cpsModuleService.createSchemaSetFromModules(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, + final String moduleSetTag = yangModelCmHandle.getModuleSetTag(); + final ModuleDelta moduleDelta; + boolean isNewModuleSetTag = Strings.isNotBlank(moduleSetTag); + try { + if (privateModuleSetCache.containsKey(moduleSetTag)) { + moduleDelta = privateModuleSetCache.get(moduleSetTag); + } else { + if (isNewModuleSetTag) { + if (moduleSetTagsBeingProcessed.add(moduleSetTag)) { + log.info("Processing new module set tag {}", moduleSetTag); + } else { + isNewModuleSetTag = false; + throw new NcmpException("Concurrent processing of module set tag " + moduleSetTag, + moduleSetTag + " already being processed for cm handle " + yangModelCmHandle.getId()); + } + } + moduleDelta = getModuleDelta(yangModelCmHandle, moduleSetTag); + } + final String cmHandleId = yangModelCmHandle.getId(); + cpsModuleService.createSchemaSetFromModules(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, moduleDelta.newModuleNameToContentMap, moduleDelta.allModuleReferences); - cpsAnchorService.createAnchor(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, cmHandleId); + cpsAnchorService.createAnchor(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, cmHandleId); + if (isNewModuleSetTag) { + final ModuleDelta noModuleDelta = new ModuleDelta(moduleDelta.allModuleReferences, NO_NEW_MODULES); + privateModuleSetCache.put(moduleSetTag, noModuleDelta); + } + } finally { + if (isNewModuleSetTag) { + moduleSetTagsBeingProcessed.remove(moduleSetTag); + } + } } /** @@ -105,6 +138,10 @@ public class ModuleSyncService { } } + public void clearPrivateModuleSetCache() { + privateModuleSetCache.clear(); + } + private ModuleDelta getModuleDelta(final YangModelCmHandle yangModelCmHandle, final String targetModuleSetTag) { final Map<String, String> newYangResources; Collection<ModuleReference> allModuleReferences = getModuleReferencesByModuleSetTag(targetModuleSetTag); @@ -120,7 +157,7 @@ public class ModuleSyncService { } private Collection<ModuleReference> getModuleReferencesByModuleSetTag(final String moduleSetTag) { - if (moduleSetTag == null || moduleSetTag.trim().isEmpty()) { + if (Strings.isBlank(moduleSetTag)) { return Collections.emptyList(); } return cpsModuleService.getModuleReferencesByAttribute(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR, diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java index 31fcbad08b..7cc74a3b55 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java @@ -63,7 +63,8 @@ public class ModuleSyncTasks { try { cmHandlesAsDataNodes.forEach(cmHandleAsDataNode -> { final YangModelCmHandle yangModelCmHandle = YangDataConverter.toYangModelCmHandle(cmHandleAsDataNode); - cmHandleStatePerCmHandle.put(yangModelCmHandle, processCmHandle(yangModelCmHandle)); + final CmHandleState cmHandleState = processCmHandle(yangModelCmHandle); + cmHandleStatePerCmHandle.put(yangModelCmHandle, cmHandleState); }); } finally { batchCounter.getAndDecrement(); @@ -127,4 +128,4 @@ public class ModuleSyncTasks { log.info("{} removed from in progress map", resetCmHandleId); } } -}
\ No newline at end of file +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java index 1f33cc349d..b98075c06c 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java @@ -20,8 +20,10 @@ package org.onap.cps.ncmp.impl.inventory.sync; +import com.hazelcast.collection.ISet; import com.hazelcast.config.MapConfig; import com.hazelcast.config.QueueConfig; +import com.hazelcast.config.SetConfig; import com.hazelcast.map.IMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.locks.Lock; @@ -44,6 +46,8 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig { private static final QueueConfig commonQueueConfig = createQueueConfig("defaultQueueConfig"); private static final MapConfig moduleSyncStartedConfig = createMapConfig("moduleSyncStartedConfig"); private static final MapConfig dataSyncSemaphoresConfig = createMapConfig("dataSyncSemaphoresConfig"); + private static final SetConfig moduleSetTagsBeingProcessedConfig + = createSetConfig("moduleSetTagsBeingProcessedConfig"); private static final String LOCK_NAME_FOR_WORK_QUEUE = "workQueueLock"; /** @@ -63,8 +67,7 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig { */ @Bean public IMap<String, Object> moduleSyncStartedOnCmHandles() { - return getOrCreateHazelcastInstance(moduleSyncStartedConfig).getMap( - "moduleSyncStartedOnCmHandles"); + return getOrCreateHazelcastInstance(moduleSyncStartedConfig).getMap("moduleSyncStartedOnCmHandles"); } /** @@ -78,6 +81,17 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig { } /** + * Collection of (new) module set tags being processed. + * To prevent processing on multiple threads of same tag + * + * @return set of module set tags being processed + */ + @Bean + public ISet<String> moduleSetTagsBeingProcessed() { + return getOrCreateHazelcastInstance(moduleSetTagsBeingProcessedConfig).getSet("moduleSetTagsBeingProcessed"); + } + + /** * Retrieves a distributed lock used to control access to the work queue for module synchronization. * This lock ensures that the population and modification of the work queue are thread-safe and * protected from concurrent access across different nodes in the distributed system. |