aboutsummaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src/main')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleQueryService.java6
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleQueryServiceImpl.java16
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleOperationsUtils.java9
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java51
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java28
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java7
6 files changed, 59 insertions, 58 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleQueryService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleQueryService.java
index bb9ad50ccf..2ba1b2ab5d 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleQueryService.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleQueryService.java
@@ -59,12 +59,12 @@ public interface CmHandleQueryService {
boolean outputAlternateId);
/**
- * Method which returns cm handles by the cm handles state.
+ * Method which returns cm handle ids by the cm handles state.
*
* @param cmHandleState cm handle state
- * @return a list of data nodes representing the cm handles.
+ * @return a list of cm handle ids.
*/
- Collection<DataNode> queryCmHandlesByState(CmHandleState cmHandleState);
+ Collection<String> queryCmHandleIdsByState(CmHandleState cmHandleState);
/**
* Method to return data nodes with ancestor representing the cm handles.
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleQueryServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleQueryServiceImpl.java
index 96fa03d7aa..5610013863 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleQueryServiceImpl.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleQueryServiceImpl.java
@@ -21,7 +21,6 @@
package org.onap.cps.ncmp.impl.inventory;
-import static org.onap.cps.api.parameters.FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS;
import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS;
import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DATASPACE_NAME;
import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY_ANCHOR;
@@ -45,6 +44,7 @@ import org.onap.cps.ncmp.impl.inventory.models.CmHandleState;
import org.onap.cps.ncmp.impl.inventory.models.ModelledDmiServiceLeaves;
import org.onap.cps.ncmp.impl.inventory.models.PropertyType;
import org.onap.cps.ncmp.impl.inventory.trustlevel.TrustLevelCacheConfig;
+import org.onap.cps.ncmp.impl.utils.YangDataConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
@@ -87,14 +87,18 @@ public class CmHandleQueryServiceImpl implements CmHandleQueryService {
}
@Override
- public Collection<DataNode> queryCmHandlesByState(final CmHandleState cmHandleState) {
- return queryCmHandleAncestorsByCpsPath("//state[@cm-handle-state=\"" + cmHandleState + "\"]",
- INCLUDE_ALL_DESCENDANTS);
+ public Collection<String> queryCmHandleIdsByState(final CmHandleState cmHandleState) {
+ final Collection<DataNode> cmHandlesAsDataNodes =
+ queryNcmpRegistryByCpsPath("//state[@cm-handle-state='" + cmHandleState + "']", OMIT_DESCENDANTS);
+ return cmHandlesAsDataNodes.stream()
+ .map(DataNode::getXpath)
+ .map(YangDataConverter::extractCmHandleIdFromXpath)
+ .toList();
}
@Override
public Collection<DataNode> queryNcmpRegistryByCpsPath(final String cpsPath,
- final FetchDescendantsOption fetchDescendantsOption) {
+ final FetchDescendantsOption fetchDescendantsOption) {
return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR, cpsPath,
fetchDescendantsOption);
}
@@ -232,5 +236,3 @@ public class CmHandleQueryServiceImpl implements CmHandleQueryService {
xpath, OMIT_DESCENDANTS).iterator().next();
}
}
-
-
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleOperationsUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleOperationsUtils.java
index 8bf13a0ee1..2895d9b77c 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleOperationsUtils.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleOperationsUtils.java
@@ -70,13 +70,10 @@ public class ModuleOperationsUtils {
/**
* Query data nodes for cm handles with an "ADVISED" cm handle state.
*
- * @return cm handles (data nodes) in ADVISED state (empty list if none found)
+ * @return cm handle ids in ADVISED state (empty list if none found)
*/
- public Collection<DataNode> getAdvisedCmHandles() {
- final Collection<DataNode> advisedCmHandlesAsDataNodes =
- cmHandleQueryService.queryCmHandlesByState(CmHandleState.ADVISED);
- log.debug("Total number of fetched advised cm handle(s) is (are) {}", advisedCmHandlesAsDataNodes.size());
- return advisedCmHandlesAsDataNodes;
+ public Collection<String> getAdvisedCmHandleIds() {
+ return cmHandleQueryService.queryCmHandleIdsByState(CmHandleState.ADVISED);
}
/**
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
index 81656a4b33..c97b284bf1 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
@@ -28,14 +28,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.api.model.DataNode;
+import org.onap.cps.api.exceptions.DataNodeNotFoundException;
import org.onap.cps.ncmp.api.inventory.models.CompositeState;
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
import org.onap.cps.ncmp.impl.inventory.models.CmHandleState;
import org.onap.cps.ncmp.impl.inventory.models.LockReasonCategory;
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventsCmHandleStateHandler;
-import org.onap.cps.ncmp.impl.utils.YangDataConverter;
import org.springframework.stereotype.Component;
@RequiredArgsConstructor
@@ -51,21 +50,29 @@ public class ModuleSyncTasks {
/**
* Perform module sync on a batch of cm handles.
*
- * @param cmHandlesAsDataNodes a batch of Data nodes representing cm handles to perform module sync on
+ * @param cmHandleIds a batch of cm handle ids to perform module sync on
* @param batchCounter the number of batches currently being processed, will be decreased when
* task is finished or fails
* @return completed future to handle post-processing
*/
- public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes,
+ public CompletableFuture<Void> performModuleSync(final Collection<String> cmHandleIds,
final AtomicInteger batchCounter) {
- final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle =
- new HashMap<>(cmHandlesAsDataNodes.size());
+ final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(cmHandleIds.size());
try {
- cmHandlesAsDataNodes.forEach(cmHandleAsDataNode -> {
- final YangModelCmHandle yangModelCmHandle = YangDataConverter.toYangModelCmHandle(cmHandleAsDataNode);
- final CmHandleState cmHandleState = processCmHandle(yangModelCmHandle);
- cmHandleStatePerCmHandle.put(yangModelCmHandle, cmHandleState);
- });
+ for (final String cmHandleId : cmHandleIds) {
+ try {
+ final YangModelCmHandle yangModelCmHandle = inventoryPersistence.getYangModelCmHandle(cmHandleId);
+ if (isCmHandleInAdvisedState(yangModelCmHandle)) {
+ final CmHandleState newCmHandleState = processCmHandle(yangModelCmHandle);
+ cmHandleStatePerCmHandle.put(yangModelCmHandle, newCmHandleState);
+ } else {
+ log.warn("Skipping module sync for CM handle '{}' as it is in {} state", cmHandleId,
+ yangModelCmHandle.getCompositeState().getCmHandleState().name());
+ }
+ } catch (final DataNodeNotFoundException dataNodeNotFoundException) {
+ log.warn("Skipping module sync for CM handle '{}' as it does not exist", cmHandleId);
+ }
+ }
} finally {
batchCounter.getAndDecrement();
lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandleStatePerCmHandle);
@@ -96,7 +103,7 @@ public class ModuleSyncTasks {
}
private CmHandleState processCmHandle(final YangModelCmHandle yangModelCmHandle) {
- final CompositeState compositeState = inventoryPersistence.getCmHandleState(yangModelCmHandle.getId());
+ final CompositeState compositeState = yangModelCmHandle.getCompositeState();
final boolean inUpgrade = ModuleOperationsUtils.inUpgradeOrUpgradeFailed(compositeState);
try {
if (inUpgrade) {
@@ -105,27 +112,25 @@ public class ModuleSyncTasks {
moduleSyncService.deleteSchemaSetIfExists(yangModelCmHandle.getId());
moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle);
}
- yangModelCmHandle.getCompositeState().setLockReason(null);
+ compositeState.setLockReason(null);
return CmHandleState.READY;
} catch (final Exception e) {
log.warn("Processing of {} module failed due to reason {}.", yangModelCmHandle.getId(), e.getMessage());
- final LockReasonCategory lockReasonCategory = inUpgrade ? LockReasonCategory.MODULE_UPGRADE_FAILED
- : LockReasonCategory.MODULE_SYNC_FAILED;
- moduleOperationsUtils.updateLockReasonWithAttempts(compositeState,
- lockReasonCategory, e.getMessage());
- setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason());
+ final LockReasonCategory lockReasonCategory = inUpgrade
+ ? LockReasonCategory.MODULE_UPGRADE_FAILED
+ : LockReasonCategory.MODULE_SYNC_FAILED;
+ moduleOperationsUtils.updateLockReasonWithAttempts(compositeState, lockReasonCategory, e.getMessage());
return CmHandleState.LOCKED;
}
}
- private void setCmHandleStateLocked(final YangModelCmHandle advisedCmHandle,
- final CompositeState.LockReason lockReason) {
- advisedCmHandle.getCompositeState().setLockReason(lockReason);
- }
-
private void removeResetCmHandleFromModuleSyncMap(final String resetCmHandleId) {
if (moduleSyncStartedOnCmHandles.remove(resetCmHandleId) != null) {
log.info("{} removed from in progress map", resetCmHandleId);
}
}
+
+ private static boolean isCmHandleInAdvisedState(final YangModelCmHandle yangModelCmHandle) {
+ return yangModelCmHandle.getCompositeState().getCmHandleState() == CmHandleState.ADVISED;
+ }
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
index 42c0d8f31f..74bef43d0b 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
@@ -31,7 +31,6 @@ import java.util.concurrent.locks.Lock;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.api.model.DataNode;
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
import org.onap.cps.ncmp.impl.utils.Sleeper;
import org.springframework.scheduling.annotation.Scheduled;
@@ -43,7 +42,7 @@ import org.springframework.stereotype.Service;
public class ModuleSyncWatchdog {
private final ModuleOperationsUtils moduleOperationsUtils;
- private final BlockingQueue<DataNode> moduleSyncWorkQueue;
+ private final BlockingQueue<String> moduleSyncWorkQueue;
private final IMap<String, Object> moduleSyncStartedOnCmHandles;
private final ModuleSyncTasks moduleSyncTasks;
private final AsyncTaskExecutor asyncTaskExecutor;
@@ -70,7 +69,7 @@ public class ModuleSyncWatchdog {
populateWorkQueueIfNeeded();
while (!moduleSyncWorkQueue.isEmpty()) {
if (batchCounter.get() <= asyncTaskExecutor.getAsyncTaskParallelismLevel()) {
- final Collection<DataNode> nextBatch = prepareNextBatch();
+ final Collection<String> nextBatch = prepareNextBatch();
log.info("Processing module sync batch of {}. {} batch(es) active.",
nextBatch.size(), batchCounter.get());
if (!nextBatch.isEmpty()) {
@@ -104,14 +103,14 @@ public class ModuleSyncWatchdog {
}
private void populateWorkQueue() {
- final Collection<DataNode> advisedCmHandles = moduleOperationsUtils.getAdvisedCmHandles();
- if (advisedCmHandles.isEmpty()) {
+ final Collection<String> advisedCmHandleIds = moduleOperationsUtils.getAdvisedCmHandleIds();
+ if (advisedCmHandleIds.isEmpty()) {
log.debug("No advised CM handles found in DB.");
} else {
- log.info("Fetched {} advised CM handles from DB. Adding them to the work queue.", advisedCmHandles.size());
- advisedCmHandles.forEach(advisedCmHandle -> {
- final String cmHandleId = String.valueOf(advisedCmHandle.getLeaves().get("id"));
- if (moduleSyncWorkQueue.offer(advisedCmHandle)) {
+ log.info("Fetched {} advised CM handles from DB. Adding them to the work queue.",
+ advisedCmHandleIds.size());
+ advisedCmHandleIds.forEach(cmHandleId -> {
+ if (moduleSyncWorkQueue.offer(cmHandleId)) {
log.info("CM handle {} added to the work queue.", cmHandleId);
} else {
log.warn("Failed to add CM handle {} to the work queue.", cmHandleId);
@@ -133,13 +132,12 @@ public class ModuleSyncWatchdog {
}
}
- private Collection<DataNode> prepareNextBatch() {
- final Collection<DataNode> nextBatchCandidates = new HashSet<>(MODULE_SYNC_BATCH_SIZE);
- final Collection<DataNode> nextBatch = new HashSet<>(MODULE_SYNC_BATCH_SIZE);
+ private Collection<String> prepareNextBatch() {
+ final Collection<String> nextBatchCandidates = new HashSet<>(MODULE_SYNC_BATCH_SIZE);
+ final Collection<String> nextBatch = new HashSet<>(MODULE_SYNC_BATCH_SIZE);
moduleSyncWorkQueue.drainTo(nextBatchCandidates, MODULE_SYNC_BATCH_SIZE);
log.info("nextBatchCandidates size : {}", nextBatchCandidates.size());
- for (final DataNode batchCandidate : nextBatchCandidates) {
- final String cmHandleId = String.valueOf(batchCandidate.getLeaves().get("id"));
+ for (final String cmHandleId : nextBatchCandidates) {
final boolean alreadyAddedToInProgressMap = VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP.equals(
moduleSyncStartedOnCmHandles.putIfAbsent(cmHandleId, VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP,
SynchronizationCacheConfig.MODULE_SYNC_STARTED_TTL_SECS, TimeUnit.SECONDS));
@@ -147,7 +145,7 @@ public class ModuleSyncWatchdog {
log.info("module sync for {} already in progress by other instance", cmHandleId);
} else {
log.info("Adding cmHandle : {} to current batch", cmHandleId);
- nextBatch.add(batchCandidate);
+ nextBatch.add(cmHandleId);
}
}
log.info("nextBatch size : {}", nextBatch.size());
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java
index 9cf29aacac..671e791ac2 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START========================================================
- * Copyright (C) 2022-2023 Nordix Foundation
+ * Copyright (C) 2022-2024 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,7 +28,6 @@ import com.hazelcast.map.IMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.Lock;
import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.api.model.DataNode;
import org.onap.cps.ncmp.impl.cache.HazelcastCacheConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -53,10 +52,10 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig {
/**
* Module Sync Distributed Queue Instance.
*
- * @return queue of cm handles (data nodes) that need module sync
+ * @return queue of cm handle ids that need module sync
*/
@Bean
- public BlockingQueue<DataNode> moduleSyncWorkQueue() {
+ public BlockingQueue<String> moduleSyncWorkQueue() {
return getOrCreateHazelcastInstance(commonQueueConfig).getQueue("moduleSyncWorkQueue");
}