diff options
Diffstat (limited to 'cps-ncmp-service/src/main/java')
6 files changed, 59 insertions, 58 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleQueryService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleQueryService.java index bb9ad50ccf..2ba1b2ab5d 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleQueryService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleQueryService.java @@ -59,12 +59,12 @@ public interface CmHandleQueryService { boolean outputAlternateId); /** - * Method which returns cm handles by the cm handles state. + * Method which returns cm handle ids by the cm handles state. * * @param cmHandleState cm handle state - * @return a list of data nodes representing the cm handles. + * @return a list of cm handle ids. */ - Collection<DataNode> queryCmHandlesByState(CmHandleState cmHandleState); + Collection<String> queryCmHandleIdsByState(CmHandleState cmHandleState); /** * Method to return data nodes with ancestor representing the cm handles. diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleQueryServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleQueryServiceImpl.java index 96fa03d7aa..5610013863 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleQueryServiceImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleQueryServiceImpl.java @@ -21,7 +21,6 @@ package org.onap.cps.ncmp.impl.inventory; -import static org.onap.cps.api.parameters.FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS; import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS; import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DATASPACE_NAME; import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY_ANCHOR; @@ -45,6 +44,7 @@ import org.onap.cps.ncmp.impl.inventory.models.CmHandleState; import org.onap.cps.ncmp.impl.inventory.models.ModelledDmiServiceLeaves; import org.onap.cps.ncmp.impl.inventory.models.PropertyType; import org.onap.cps.ncmp.impl.inventory.trustlevel.TrustLevelCacheConfig; +import org.onap.cps.ncmp.impl.utils.YangDataConverter; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; @@ -87,14 +87,18 @@ public class CmHandleQueryServiceImpl implements CmHandleQueryService { } @Override - public Collection<DataNode> queryCmHandlesByState(final CmHandleState cmHandleState) { - return queryCmHandleAncestorsByCpsPath("//state[@cm-handle-state=\"" + cmHandleState + "\"]", - INCLUDE_ALL_DESCENDANTS); + public Collection<String> queryCmHandleIdsByState(final CmHandleState cmHandleState) { + final Collection<DataNode> cmHandlesAsDataNodes = + queryNcmpRegistryByCpsPath("//state[@cm-handle-state='" + cmHandleState + "']", OMIT_DESCENDANTS); + return cmHandlesAsDataNodes.stream() + .map(DataNode::getXpath) + .map(YangDataConverter::extractCmHandleIdFromXpath) + .toList(); } @Override public Collection<DataNode> queryNcmpRegistryByCpsPath(final String cpsPath, - final FetchDescendantsOption fetchDescendantsOption) { + final FetchDescendantsOption fetchDescendantsOption) { return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR, cpsPath, fetchDescendantsOption); } @@ -232,5 +236,3 @@ public class CmHandleQueryServiceImpl implements CmHandleQueryService { xpath, OMIT_DESCENDANTS).iterator().next(); } } - - diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleOperationsUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleOperationsUtils.java index 8bf13a0ee1..2895d9b77c 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleOperationsUtils.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleOperationsUtils.java @@ -70,13 +70,10 @@ public class ModuleOperationsUtils { /** * Query data nodes for cm handles with an "ADVISED" cm handle state. * - * @return cm handles (data nodes) in ADVISED state (empty list if none found) + * @return cm handle ids in ADVISED state (empty list if none found) */ - public Collection<DataNode> getAdvisedCmHandles() { - final Collection<DataNode> advisedCmHandlesAsDataNodes = - cmHandleQueryService.queryCmHandlesByState(CmHandleState.ADVISED); - log.debug("Total number of fetched advised cm handle(s) is (are) {}", advisedCmHandlesAsDataNodes.size()); - return advisedCmHandlesAsDataNodes; + public Collection<String> getAdvisedCmHandleIds() { + return cmHandleQueryService.queryCmHandleIdsByState(CmHandleState.ADVISED); } /** diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java index 81656a4b33..c97b284bf1 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java @@ -28,14 +28,13 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.onap.cps.api.model.DataNode; +import org.onap.cps.api.exceptions.DataNodeNotFoundException; import org.onap.cps.ncmp.api.inventory.models.CompositeState; import org.onap.cps.ncmp.impl.inventory.InventoryPersistence; import org.onap.cps.ncmp.impl.inventory.models.CmHandleState; import org.onap.cps.ncmp.impl.inventory.models.LockReasonCategory; import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle; import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventsCmHandleStateHandler; -import org.onap.cps.ncmp.impl.utils.YangDataConverter; import org.springframework.stereotype.Component; @RequiredArgsConstructor @@ -51,21 +50,29 @@ public class ModuleSyncTasks { /** * Perform module sync on a batch of cm handles. * - * @param cmHandlesAsDataNodes a batch of Data nodes representing cm handles to perform module sync on + * @param cmHandleIds a batch of cm handle ids to perform module sync on * @param batchCounter the number of batches currently being processed, will be decreased when * task is finished or fails * @return completed future to handle post-processing */ - public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes, + public CompletableFuture<Void> performModuleSync(final Collection<String> cmHandleIds, final AtomicInteger batchCounter) { - final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = - new HashMap<>(cmHandlesAsDataNodes.size()); + final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(cmHandleIds.size()); try { - cmHandlesAsDataNodes.forEach(cmHandleAsDataNode -> { - final YangModelCmHandle yangModelCmHandle = YangDataConverter.toYangModelCmHandle(cmHandleAsDataNode); - final CmHandleState cmHandleState = processCmHandle(yangModelCmHandle); - cmHandleStatePerCmHandle.put(yangModelCmHandle, cmHandleState); - }); + for (final String cmHandleId : cmHandleIds) { + try { + final YangModelCmHandle yangModelCmHandle = inventoryPersistence.getYangModelCmHandle(cmHandleId); + if (isCmHandleInAdvisedState(yangModelCmHandle)) { + final CmHandleState newCmHandleState = processCmHandle(yangModelCmHandle); + cmHandleStatePerCmHandle.put(yangModelCmHandle, newCmHandleState); + } else { + log.warn("Skipping module sync for CM handle '{}' as it is in {} state", cmHandleId, + yangModelCmHandle.getCompositeState().getCmHandleState().name()); + } + } catch (final DataNodeNotFoundException dataNodeNotFoundException) { + log.warn("Skipping module sync for CM handle '{}' as it does not exist", cmHandleId); + } + } } finally { batchCounter.getAndDecrement(); lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandleStatePerCmHandle); @@ -96,7 +103,7 @@ public class ModuleSyncTasks { } private CmHandleState processCmHandle(final YangModelCmHandle yangModelCmHandle) { - final CompositeState compositeState = inventoryPersistence.getCmHandleState(yangModelCmHandle.getId()); + final CompositeState compositeState = yangModelCmHandle.getCompositeState(); final boolean inUpgrade = ModuleOperationsUtils.inUpgradeOrUpgradeFailed(compositeState); try { if (inUpgrade) { @@ -105,27 +112,25 @@ public class ModuleSyncTasks { moduleSyncService.deleteSchemaSetIfExists(yangModelCmHandle.getId()); moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle); } - yangModelCmHandle.getCompositeState().setLockReason(null); + compositeState.setLockReason(null); return CmHandleState.READY; } catch (final Exception e) { log.warn("Processing of {} module failed due to reason {}.", yangModelCmHandle.getId(), e.getMessage()); - final LockReasonCategory lockReasonCategory = inUpgrade ? LockReasonCategory.MODULE_UPGRADE_FAILED - : LockReasonCategory.MODULE_SYNC_FAILED; - moduleOperationsUtils.updateLockReasonWithAttempts(compositeState, - lockReasonCategory, e.getMessage()); - setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason()); + final LockReasonCategory lockReasonCategory = inUpgrade + ? LockReasonCategory.MODULE_UPGRADE_FAILED + : LockReasonCategory.MODULE_SYNC_FAILED; + moduleOperationsUtils.updateLockReasonWithAttempts(compositeState, lockReasonCategory, e.getMessage()); return CmHandleState.LOCKED; } } - private void setCmHandleStateLocked(final YangModelCmHandle advisedCmHandle, - final CompositeState.LockReason lockReason) { - advisedCmHandle.getCompositeState().setLockReason(lockReason); - } - private void removeResetCmHandleFromModuleSyncMap(final String resetCmHandleId) { if (moduleSyncStartedOnCmHandles.remove(resetCmHandleId) != null) { log.info("{} removed from in progress map", resetCmHandleId); } } + + private static boolean isCmHandleInAdvisedState(final YangModelCmHandle yangModelCmHandle) { + return yangModelCmHandle.getCompositeState().getCmHandleState() == CmHandleState.ADVISED; + } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java index 42c0d8f31f..74bef43d0b 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java @@ -31,7 +31,6 @@ import java.util.concurrent.locks.Lock; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.onap.cps.api.model.DataNode; import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle; import org.onap.cps.ncmp.impl.utils.Sleeper; import org.springframework.scheduling.annotation.Scheduled; @@ -43,7 +42,7 @@ import org.springframework.stereotype.Service; public class ModuleSyncWatchdog { private final ModuleOperationsUtils moduleOperationsUtils; - private final BlockingQueue<DataNode> moduleSyncWorkQueue; + private final BlockingQueue<String> moduleSyncWorkQueue; private final IMap<String, Object> moduleSyncStartedOnCmHandles; private final ModuleSyncTasks moduleSyncTasks; private final AsyncTaskExecutor asyncTaskExecutor; @@ -70,7 +69,7 @@ public class ModuleSyncWatchdog { populateWorkQueueIfNeeded(); while (!moduleSyncWorkQueue.isEmpty()) { if (batchCounter.get() <= asyncTaskExecutor.getAsyncTaskParallelismLevel()) { - final Collection<DataNode> nextBatch = prepareNextBatch(); + final Collection<String> nextBatch = prepareNextBatch(); log.info("Processing module sync batch of {}. {} batch(es) active.", nextBatch.size(), batchCounter.get()); if (!nextBatch.isEmpty()) { @@ -104,14 +103,14 @@ public class ModuleSyncWatchdog { } private void populateWorkQueue() { - final Collection<DataNode> advisedCmHandles = moduleOperationsUtils.getAdvisedCmHandles(); - if (advisedCmHandles.isEmpty()) { + final Collection<String> advisedCmHandleIds = moduleOperationsUtils.getAdvisedCmHandleIds(); + if (advisedCmHandleIds.isEmpty()) { log.debug("No advised CM handles found in DB."); } else { - log.info("Fetched {} advised CM handles from DB. Adding them to the work queue.", advisedCmHandles.size()); - advisedCmHandles.forEach(advisedCmHandle -> { - final String cmHandleId = String.valueOf(advisedCmHandle.getLeaves().get("id")); - if (moduleSyncWorkQueue.offer(advisedCmHandle)) { + log.info("Fetched {} advised CM handles from DB. Adding them to the work queue.", + advisedCmHandleIds.size()); + advisedCmHandleIds.forEach(cmHandleId -> { + if (moduleSyncWorkQueue.offer(cmHandleId)) { log.info("CM handle {} added to the work queue.", cmHandleId); } else { log.warn("Failed to add CM handle {} to the work queue.", cmHandleId); @@ -133,13 +132,12 @@ public class ModuleSyncWatchdog { } } - private Collection<DataNode> prepareNextBatch() { - final Collection<DataNode> nextBatchCandidates = new HashSet<>(MODULE_SYNC_BATCH_SIZE); - final Collection<DataNode> nextBatch = new HashSet<>(MODULE_SYNC_BATCH_SIZE); + private Collection<String> prepareNextBatch() { + final Collection<String> nextBatchCandidates = new HashSet<>(MODULE_SYNC_BATCH_SIZE); + final Collection<String> nextBatch = new HashSet<>(MODULE_SYNC_BATCH_SIZE); moduleSyncWorkQueue.drainTo(nextBatchCandidates, MODULE_SYNC_BATCH_SIZE); log.info("nextBatchCandidates size : {}", nextBatchCandidates.size()); - for (final DataNode batchCandidate : nextBatchCandidates) { - final String cmHandleId = String.valueOf(batchCandidate.getLeaves().get("id")); + for (final String cmHandleId : nextBatchCandidates) { final boolean alreadyAddedToInProgressMap = VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP.equals( moduleSyncStartedOnCmHandles.putIfAbsent(cmHandleId, VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP, SynchronizationCacheConfig.MODULE_SYNC_STARTED_TTL_SECS, TimeUnit.SECONDS)); @@ -147,7 +145,7 @@ public class ModuleSyncWatchdog { log.info("module sync for {} already in progress by other instance", cmHandleId); } else { log.info("Adding cmHandle : {} to current batch", cmHandleId); - nextBatch.add(batchCandidate); + nextBatch.add(cmHandleId); } } log.info("nextBatch size : {}", nextBatch.size()); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java index 9cf29aacac..671e791ac2 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================== - * Copyright (C) 2022-2023 Nordix Foundation + * Copyright (C) 2022-2024 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,7 +28,6 @@ import com.hazelcast.map.IMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.locks.Lock; import lombok.extern.slf4j.Slf4j; -import org.onap.cps.api.model.DataNode; import org.onap.cps.ncmp.impl.cache.HazelcastCacheConfig; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -53,10 +52,10 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig { /** * Module Sync Distributed Queue Instance. * - * @return queue of cm handles (data nodes) that need module sync + * @return queue of cm handle ids that need module sync */ @Bean - public BlockingQueue<DataNode> moduleSyncWorkQueue() { + public BlockingQueue<String> moduleSyncWorkQueue() { return getOrCreateHazelcastInstance(commonQueueConfig).getQueue("moduleSyncWorkQueue"); } |