diff options
Diffstat (limited to 'cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java')
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java | 55 |
1 files changed, 35 insertions, 20 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java index 395fb01f4..45ba07804 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java @@ -1,5 +1,5 @@ /* - * ============LICENSE_START======================================================= + * ============LICENSE_START======================================================= * Copyright (C) 2022 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -21,14 +21,15 @@ package org.onap.cps.ncmp.api.inventory.sync; import java.time.OffsetDateTime; +import java.util.concurrent.ConcurrentMap; import java.util.function.Consumer; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.api.CpsDataService; -import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; import org.onap.cps.ncmp.api.inventory.CompositeState; import org.onap.cps.ncmp.api.inventory.DataStoreSyncState; import org.onap.cps.ncmp.api.inventory.InventoryPersistence; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @@ -37,39 +38,45 @@ import org.springframework.stereotype.Service; @Service public class DataSyncWatchdog { + private static final boolean DATA_SYNC_IN_PROGRESS = false; + private static final boolean DATA_SYNC_DONE = true; + private final InventoryPersistence inventoryPersistence; private final CpsDataService cpsDataService; private final SyncUtils syncUtils; + @Qualifier("dataSyncSemaphores") + private final ConcurrentMap<String, Boolean> dataSyncSemaphores; + /** * Execute Cm Handle poll which queries the cm handle state in 'READY' and Operational Datastore Sync State in * 'UNSYNCHRONIZED'. */ @Scheduled(fixedDelayString = "${timers.cm-handle-data-sync.sleep-time-ms:30000}") public void executeUnSynchronizedReadyCmHandlePoll() { - YangModelCmHandle unSynchronizedReadyCmHandle = syncUtils.getAnUnSynchronizedReadyCmHandle(); - while (unSynchronizedReadyCmHandle != null) { + syncUtils.getUnsynchronizedReadyCmHandles().forEach(unSynchronizedReadyCmHandle -> { final String cmHandleId = unSynchronizedReadyCmHandle.getId(); - log.debug("Cm-Handles found in READY and UNSYNCHRONIZED state: {}", cmHandleId); - final CompositeState compositeState = inventoryPersistence - .getCmHandleState(cmHandleId); - final String resourceData = syncUtils.getResourceData(cmHandleId); - if (resourceData == null) { - log.debug("Error accessing the node for Cm-Handle: {}", cmHandleId); - } else if (unSynchronizedReadyCmHandle.getCompositeState().getDataSyncEnabled().equals(false)) { - log.debug("Error: data sync enabled for {} must be true." - + "Data sync enabled is currently set to false", cmHandleId); + if (hasPushedIntoSemaphoreMap(cmHandleId)) { + log.debug("Executing data sync on {}", cmHandleId); + final CompositeState compositeState = inventoryPersistence + .getCmHandleState(cmHandleId); + final String resourceData = syncUtils.getResourceData(cmHandleId); + if (resourceData == null) { + log.debug("Error retrieving resource data for Cm-Handle: {}", cmHandleId); + } else { + cpsDataService.saveData("NFP-Operational", cmHandleId, + resourceData, OffsetDateTime.now()); + setSyncStateToSynchronized().accept(compositeState); + inventoryPersistence.saveCmHandleState(cmHandleId, compositeState); + updateDataSyncSemaphoreMap(cmHandleId); + } } else { - cpsDataService.saveData("NFP-Operational", cmHandleId, - resourceData, OffsetDateTime.now()); - setSyncStateToSynchronized().accept(compositeState); - inventoryPersistence.saveCmHandleState(cmHandleId, compositeState); + log.debug("{} already processed by another instance", cmHandleId); } - unSynchronizedReadyCmHandle = syncUtils.getAnUnSynchronizedReadyCmHandle(); - } - log.debug("No Cm-Handles currently found in an READY State and Operational Sync State is UNSYNCHRONIZED"); + }); + log.debug("No Cm-Handles currently found in READY State and Operational Sync State is UNSYNCHRONIZED"); } private Consumer<CompositeState> setSyncStateToSynchronized() { @@ -81,4 +88,12 @@ public class DataSyncWatchdog { .lastSyncTime(CompositeState.nowInSyncTimeFormat()).build()); }; } + + private void updateDataSyncSemaphoreMap(final String cmHandleId) { + dataSyncSemaphores.replace(cmHandleId, DATA_SYNC_DONE); + } + + private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) { + return dataSyncSemaphores.putIfAbsent(cmHandleId, DATA_SYNC_IN_PROGRESS) == null; + } } |