diff options
Diffstat (limited to 'cps-ncmp-service/src/main/java')
4 files changed, 34 insertions, 27 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java index 3f440d65bd..0983889802 100755 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java @@ -27,6 +27,7 @@ import static org.onap.cps.ncmp.api.impl.constants.DmiRegistryConstants.NFP_OPER import static org.onap.cps.ncmp.api.impl.operations.DmiRequestBody.OperationEnum; import static org.onap.cps.utils.CmHandleQueryRestParametersValidator.validateCmHandleQueryParameters; +import com.hazelcast.map.IMap; import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.Collection; @@ -77,20 +78,14 @@ import org.springframework.stereotype.Service; public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService { private final JsonObjectMapper jsonObjectMapper; - private final DmiDataOperations dmiDataOperations; - private final NetworkCmProxyDataServicePropertyHandler networkCmProxyDataServicePropertyHandler; - private final InventoryPersistence inventoryPersistence; - private final CmHandleQueries cmHandleQueries; - private final NetworkCmProxyCmHandlerQueryService networkCmProxyCmHandlerQueryService; - private final LcmEventsCmHandleStateHandler lcmEventsCmHandleStateHandler; - private final CpsDataService cpsDataService; + private final IMap<String, Object> moduleSyncStartedOnCmHandles; @Override public DmiPluginRegistrationResponse updateDmiRegistrationAndSyncModule( @@ -329,7 +324,7 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService final YangModelCmHandle yangModelCmHandle = inventoryPersistence.getYangModelCmHandle(cmHandleId); lcmEventsCmHandleStateHandler.updateCmHandleState(yangModelCmHandle, CmHandleState.DELETING); - deleteCmHandleByCmHandleId(cmHandleId); + deleteCmHandleFromDbAndModuleSyncMap(cmHandleId); cmHandleRegistrationResponses.add(CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)); lcmEventsCmHandleStateHandler.updateCmHandleState(yangModelCmHandle, CmHandleState.DELETED); @@ -353,9 +348,17 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService return cmHandleRegistrationResponses; } - private void deleteCmHandleByCmHandleId(final String cmHandleId) { + private void deleteCmHandleFromDbAndModuleSyncMap(final String cmHandleId) { inventoryPersistence.deleteSchemaSetWithCascade(cmHandleId); inventoryPersistence.deleteListOrListElement("/dmi-registry/cm-handles[@id='" + cmHandleId + "']"); + removeDeletedCmHandleFromModuleSyncMap(cmHandleId); + } + + // CPS-1239 Robustness cleaning of in progress cache + private void removeDeletedCmHandleFromModuleSyncMap(final String deletedCmHandleId) { + if (moduleSyncStartedOnCmHandles.remove(deletedCmHandleId) != null) { + log.debug("{} removed from in progress map", deletedCmHandleId); + } } private List<CmHandleRegistrationResponse> registerNewCmHandles(final Map<YangModelCmHandle, CmHandleState> diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java index abde4c2d54..c89388b291 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java @@ -26,7 +26,7 @@ import com.hazelcast.config.NamedConfig; import com.hazelcast.config.QueueConfig; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; -import java.util.Map; +import com.hazelcast.map.IMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import org.onap.cps.spi.model.DataNode; @@ -62,7 +62,7 @@ public class SynchronizationCacheConfig { * @return Map of cm handles (ids) and objects (not used really) for which module sync has started or been completed */ @Bean - public Map<String, Object> moduleSyncStartedOnCmHandles() { + public IMap<String, Object> moduleSyncStartedOnCmHandles() { return createHazelcastInstance("moduleSyncStartedOnCmHandles", moduleSyncStartedConfig) .getMap("moduleSyncStartedOnCmHandles"); } @@ -73,7 +73,7 @@ public class SynchronizationCacheConfig { * @return configured map of data sync semaphores */ @Bean - public Map<String, Boolean> dataSyncSemaphores() { + public IMap<String, Boolean> dataSyncSemaphores() { return createHazelcastInstance("dataSyncSemaphores", dataSyncSemaphoresConfig) .getMap("dataSyncSemaphores"); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java index f914547a50..004ef289ac 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java @@ -20,6 +20,7 @@ package org.onap.cps.ncmp.api.inventory.sync; +import com.hazelcast.map.IMap; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -46,15 +47,14 @@ public class ModuleSyncTasks { private final SyncUtils syncUtils; private final ModuleSyncService moduleSyncService; private final LcmEventsCmHandleStateHandler lcmEventsCmHandleStateHandler; - - private static final CompletableFuture<Void> COMPLETED_FUTURE = CompletableFuture.completedFuture(null); + private final IMap<String, Object> moduleSyncStartedOnCmHandles; /** * Perform module sync on a batch of cm handles. * - * @param cmHandlesAsDataNodes a batch of Data nodes representing cm handles to perform module sync on - * @param batchCounter the number of batches currently being processed, will be decreased when task is finished - * or fails + * @param cmHandlesAsDataNodes a batch of Data nodes representing cm handles to perform module sync on + * @param batchCounter the number of batches currently being processed, will be decreased when + * task is finished or fails * @return completed future to handle post-processing */ public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes, @@ -71,7 +71,7 @@ public class ModuleSyncTasks { moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle); cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY); } catch (final Exception e) { - log.warn("Processing module sync batch failed."); + log.warn("Processing of {} module sync failed.", cmHandleId); syncUtils.updateLockReasonDetailsAndAttempts(compositeState, LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage()); setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason()); @@ -84,28 +84,28 @@ public class ModuleSyncTasks { batchCounter.getAndDecrement(); log.info("Processing module sync batch finished. {} batch(es) active.", batchCounter.get()); } - return COMPLETED_FUTURE; + return CompletableFuture.completedFuture(null); } /** * Reset state to "ADVISED" for any previously failed cm handles. * * @param failedCmHandles previously failed (locked) cm handles - * @return completed future to handle post-processing */ - public CompletableFuture<Void> resetFailedCmHandles(final List<YangModelCmHandle> failedCmHandles) { + public void resetFailedCmHandles(final List<YangModelCmHandle> failedCmHandles) { final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(failedCmHandles.size()); for (final YangModelCmHandle failedCmHandle : failedCmHandles) { final CompositeState compositeState = failedCmHandle.getCompositeState(); final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState); if (isReadyForRetry) { + final String resetCmHandleId = failedCmHandle.getId(); log.debug("Reset cm handle {} state to ADVISED to be re-attempted by module-sync watchdog", - failedCmHandle.getId()); + resetCmHandleId); cmHandleStatePerCmHandle.put(failedCmHandle, CmHandleState.ADVISED); + removeResetCmHandleFromModuleSyncMap(resetCmHandleId); } } lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandleStatePerCmHandle); - return COMPLETED_FUTURE; } private void setCmHandleStateLocked(final YangModelCmHandle advisedCmHandle, @@ -113,4 +113,9 @@ public class ModuleSyncTasks { advisedCmHandle.getCompositeState().setLockReason(lockReason); } + private void removeResetCmHandleFromModuleSyncMap(final String resetCmHandleId) { + if (moduleSyncStartedOnCmHandles.remove(resetCmHandleId) != null) { + log.debug("{} removed from in progress map", resetCmHandleId); + } + } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java index cafcdc67ff..b96889fc58 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java @@ -21,10 +21,10 @@ package org.onap.cps.ncmp.api.inventory.sync; +import com.hazelcast.map.IMap; import java.util.Collection; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -44,7 +44,7 @@ public class ModuleSyncWatchdog { private final SyncUtils syncUtils; private final BlockingQueue<DataNode> moduleSyncWorkQueue; - private final Map<String, Object> moduleSyncStartedOnCmHandles; + private final IMap<String, Object> moduleSyncStartedOnCmHandles; private final ModuleSyncTasks moduleSyncTasks; private final AsyncTaskExecutor asyncTaskExecutor; private static final int MODULE_SYNC_BATCH_SIZE = 100; @@ -72,8 +72,7 @@ public class ModuleSyncWatchdog { nextBatch.size(), batchCounter.get()); asyncTaskExecutor.executeTask(() -> moduleSyncTasks.performModuleSync(nextBatch, batchCounter), - ASYNC_TASK_TIMEOUT_IN_MILLISECONDS - ); + ASYNC_TASK_TIMEOUT_IN_MILLISECONDS); batchCounter.getAndIncrement(); } else { preventBusyWait(); |