diff options
author | kissand <andras.zoltan.kiss@est.tech> | 2022-07-14 12:37:14 +0200 |
---|---|---|
committer | kissand <andras.zoltan.kiss@est.tech> | 2022-07-27 14:23:19 +0200 |
commit | fe835cb6e0030c00d08f9eb84c3f7bb2b3d90c2e (patch) | |
tree | d41f9b5f951e0b40027997479be8964e3122a9ff /cps-ncmp-service/src/main/java/org/onap | |
parent | 054873c7c52bdb9fae718a0d7651d57b1a995dfc (diff) |
Distributed datastore solution for Module Sync Watchdog
- use semaphore map in ModuleSyncWatchdog
- increase test timeout, because it needs more time for hazelcast
initialization
Issue-ID: CPS-1015
Change-Id: I71feed8fbbd047af9fabba29a5f762a1f17a1c78
Signed-off-by: kissand <andras.zoltan.kiss@est.tech>
Diffstat (limited to 'cps-ncmp-service/src/main/java/org/onap')
2 files changed, 49 insertions, 22 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java index 978c3d16b7..1efe17695e 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java @@ -23,7 +23,10 @@ package org.onap.cps.ncmp.api.impl.config.embeddedcache; import com.hazelcast.config.Config; import com.hazelcast.config.MapConfig; import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -33,32 +36,40 @@ import org.springframework.context.annotation.Configuration; @Configuration public class SynchronizationSemaphoresConfig { + private static final int TIME_TO_LIVE_IN_SECONDS = (int) TimeUnit.MINUTES.toSeconds(30); + /** * Module Sync Distributed Map Instance. - * @return Instance of Map + * + * @return configured map of module sync semaphore */ @Bean - public Map<String, String> moduleSyncSemaphore() { - return Hazelcast.newHazelcastInstance( - initializeDefaultMapConfig("moduleSyncSemaphore", "moduleSyncSemaphoreConfig")) + public ConcurrentMap<String, Boolean> moduleSyncSemaphoreMap() { + return createHazelcastInstance("moduleSyncSemaphore", "moduleSyncSemaphoreConfig") .getMap("moduleSyncSemaphore"); } /** * Data Sync Distributed Map Instance. - * @return Instance of Map + * + * @return configured map of data sync semaphore */ @Bean - public Map<String, String> dataSyncSemaphore() { - return Hazelcast.newHazelcastInstance( - initializeDefaultMapConfig("dataSyncSemaphore", "dataSyncSemaphoreConfig")) + public Map<String, String> dataSyncSemaphoreMap() { + return createHazelcastInstance("dataSyncSemaphore", "dataSyncSemaphoreConfig") .getMap("dataSyncSemaphore"); } + private HazelcastInstance createHazelcastInstance( + final String hazelcastInstanceName, final String configMapName) { + return Hazelcast.newHazelcastInstance( + initializeDefaultMapConfig(hazelcastInstanceName, configMapName)); + } + private Config initializeDefaultMapConfig(final String instanceName, final String configName) { final Config config = new Config(instanceName); final MapConfig mapConfig = new MapConfig(configName); - mapConfig.setTimeToLiveSeconds(30); + mapConfig.setTimeToLiveSeconds(TIME_TO_LIVE_IN_SECONDS); mapConfig.setBackupCount(3); mapConfig.setAsyncBackupCount(3); config.addMapConfig(mapConfig); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java index 3f81194fe1..c71f68f772 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java @@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.inventory.sync; import java.util.List; +import java.util.concurrent.ConcurrentMap; import java.util.function.Consumer; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -49,25 +50,33 @@ public class ModuleSyncWatchdog { @Value("${data-sync.cache.enabled:false}") private boolean isGlobalDataSyncCacheEnabled; + private final ConcurrentMap<String, Boolean> moduleSyncSemaphoreMap; + /** * Execute Cm Handle poll which changes the cm handle state from 'ADVISED' to 'READY'. */ @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:30000}") public void executeAdvisedCmHandlePoll() { - syncUtils.getAdvisedCmHandles().stream().forEach(advisedCmHandle -> { + syncUtils.getAdvisedCmHandles().forEach(advisedCmHandle -> { final String cmHandleId = advisedCmHandle.getId(); - final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId); - try { - moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle); - moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle); - setCompositeStateToReadyWithInitialDataStoreSyncState().accept(compositeState); - } catch (final Exception e) { - setCompositeStateToLocked().accept(compositeState); - syncUtils.updateLockReasonDetailsAndAttempts(compositeState, - LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage()); + if (hasPushedIntoSemaphoreMap(cmHandleId)) { + log.debug("executing module sync on {}", cmHandleId); + final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId); + try { + moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle); + moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle); + setCompositeStateToReadyWithInitialDataStoreSyncState().accept(compositeState); + updateModuleSyncSemaphoreMap(cmHandleId); + } catch (final Exception e) { + setCompositeStateToLocked().accept(compositeState); + syncUtils.updateLockReasonDetailsAndAttempts(compositeState, + LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage()); + } + inventoryPersistence.saveCmHandleState(cmHandleId, compositeState); + log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name()); + } else { + log.debug("{} already processed by another instance", cmHandleId); } - inventoryPersistence.saveCmHandleState(cmHandleId, compositeState); - log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name()); }); log.debug("No Cm-Handles currently found in an ADVISED state"); } @@ -119,8 +128,15 @@ public class ModuleSyncWatchdog { private CompositeState.Operational getDataStoreSyncState(final boolean dataSyncEnabled) { final DataStoreSyncState dataStoreSyncState = dataSyncEnabled - ? DataStoreSyncState.UNSYNCHRONIZED : DataStoreSyncState.NONE_REQUESTED; + ? DataStoreSyncState.UNSYNCHRONIZED : DataStoreSyncState.NONE_REQUESTED; return CompositeState.Operational.builder().dataStoreSyncState(dataStoreSyncState).build(); } + private void updateModuleSyncSemaphoreMap(final String cmHandleId) { + moduleSyncSemaphoreMap.replace(cmHandleId, true); + } + + private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) { + return moduleSyncSemaphoreMap.putIfAbsent(cmHandleId, false) == null; + } } |