summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java55
1 files changed, 35 insertions, 20 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java
index 395fb01f4..45ba07804 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java
@@ -1,5 +1,5 @@
/*
- * ============LICENSE_START=======================================================
+ * ============LICENSE_START=======================================================
* Copyright (C) 2022 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,14 +21,15 @@
package org.onap.cps.ncmp.api.inventory.sync;
import java.time.OffsetDateTime;
+import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.api.CpsDataService;
-import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
import org.onap.cps.ncmp.api.inventory.CompositeState;
import org.onap.cps.ncmp.api.inventory.DataStoreSyncState;
import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@@ -37,39 +38,45 @@ import org.springframework.stereotype.Service;
@Service
public class DataSyncWatchdog {
+ private static final boolean DATA_SYNC_IN_PROGRESS = false;
+ private static final boolean DATA_SYNC_DONE = true;
+
private final InventoryPersistence inventoryPersistence;
private final CpsDataService cpsDataService;
private final SyncUtils syncUtils;
+ @Qualifier("dataSyncSemaphores")
+ private final ConcurrentMap<String, Boolean> dataSyncSemaphores;
+
/**
* Execute Cm Handle poll which queries the cm handle state in 'READY' and Operational Datastore Sync State in
* 'UNSYNCHRONIZED'.
*/
@Scheduled(fixedDelayString = "${timers.cm-handle-data-sync.sleep-time-ms:30000}")
public void executeUnSynchronizedReadyCmHandlePoll() {
- YangModelCmHandle unSynchronizedReadyCmHandle = syncUtils.getAnUnSynchronizedReadyCmHandle();
- while (unSynchronizedReadyCmHandle != null) {
+ syncUtils.getUnsynchronizedReadyCmHandles().forEach(unSynchronizedReadyCmHandle -> {
final String cmHandleId = unSynchronizedReadyCmHandle.getId();
- log.debug("Cm-Handles found in READY and UNSYNCHRONIZED state: {}", cmHandleId);
- final CompositeState compositeState = inventoryPersistence
- .getCmHandleState(cmHandleId);
- final String resourceData = syncUtils.getResourceData(cmHandleId);
- if (resourceData == null) {
- log.debug("Error accessing the node for Cm-Handle: {}", cmHandleId);
- } else if (unSynchronizedReadyCmHandle.getCompositeState().getDataSyncEnabled().equals(false)) {
- log.debug("Error: data sync enabled for {} must be true."
- + "Data sync enabled is currently set to false", cmHandleId);
+ if (hasPushedIntoSemaphoreMap(cmHandleId)) {
+ log.debug("Executing data sync on {}", cmHandleId);
+ final CompositeState compositeState = inventoryPersistence
+ .getCmHandleState(cmHandleId);
+ final String resourceData = syncUtils.getResourceData(cmHandleId);
+ if (resourceData == null) {
+ log.debug("Error retrieving resource data for Cm-Handle: {}", cmHandleId);
+ } else {
+ cpsDataService.saveData("NFP-Operational", cmHandleId,
+ resourceData, OffsetDateTime.now());
+ setSyncStateToSynchronized().accept(compositeState);
+ inventoryPersistence.saveCmHandleState(cmHandleId, compositeState);
+ updateDataSyncSemaphoreMap(cmHandleId);
+ }
} else {
- cpsDataService.saveData("NFP-Operational", cmHandleId,
- resourceData, OffsetDateTime.now());
- setSyncStateToSynchronized().accept(compositeState);
- inventoryPersistence.saveCmHandleState(cmHandleId, compositeState);
+ log.debug("{} already processed by another instance", cmHandleId);
}
- unSynchronizedReadyCmHandle = syncUtils.getAnUnSynchronizedReadyCmHandle();
- }
- log.debug("No Cm-Handles currently found in an READY State and Operational Sync State is UNSYNCHRONIZED");
+ });
+ log.debug("No Cm-Handles currently found in READY State and Operational Sync State is UNSYNCHRONIZED");
}
private Consumer<CompositeState> setSyncStateToSynchronized() {
@@ -81,4 +88,12 @@ public class DataSyncWatchdog {
.lastSyncTime(CompositeState.nowInSyncTimeFormat()).build());
};
}
+
+ private void updateDataSyncSemaphoreMap(final String cmHandleId) {
+ dataSyncSemaphores.replace(cmHandleId, DATA_SYNC_DONE);
+ }
+
+ private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) {
+ return dataSyncSemaphores.putIfAbsent(cmHandleId, DATA_SYNC_IN_PROGRESS) == null;
+ }
}