diff options
4 files changed, 56 insertions, 24 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java index 978c3d16b7..1efe17695e 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java @@ -23,7 +23,10 @@ package org.onap.cps.ncmp.api.impl.config.embeddedcache; import com.hazelcast.config.Config; import com.hazelcast.config.MapConfig; import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -33,32 +36,40 @@ import org.springframework.context.annotation.Configuration; @Configuration public class SynchronizationSemaphoresConfig { + private static final int TIME_TO_LIVE_IN_SECONDS = (int) TimeUnit.MINUTES.toSeconds(30); + /** * Module Sync Distributed Map Instance. - * @return Instance of Map + * + * @return configured map of module sync semaphore */ @Bean - public Map<String, String> moduleSyncSemaphore() { - return Hazelcast.newHazelcastInstance( - initializeDefaultMapConfig("moduleSyncSemaphore", "moduleSyncSemaphoreConfig")) + public ConcurrentMap<String, Boolean> moduleSyncSemaphoreMap() { + return createHazelcastInstance("moduleSyncSemaphore", "moduleSyncSemaphoreConfig") .getMap("moduleSyncSemaphore"); } /** * Data Sync Distributed Map Instance. - * @return Instance of Map + * + * @return configured map of data sync semaphore */ @Bean - public Map<String, String> dataSyncSemaphore() { - return Hazelcast.newHazelcastInstance( - initializeDefaultMapConfig("dataSyncSemaphore", "dataSyncSemaphoreConfig")) + public Map<String, String> dataSyncSemaphoreMap() { + return createHazelcastInstance("dataSyncSemaphore", "dataSyncSemaphoreConfig") .getMap("dataSyncSemaphore"); } + private HazelcastInstance createHazelcastInstance( + final String hazelcastInstanceName, final String configMapName) { + return Hazelcast.newHazelcastInstance( + initializeDefaultMapConfig(hazelcastInstanceName, configMapName)); + } + private Config initializeDefaultMapConfig(final String instanceName, final String configName) { final Config config = new Config(instanceName); final MapConfig mapConfig = new MapConfig(configName); - mapConfig.setTimeToLiveSeconds(30); + mapConfig.setTimeToLiveSeconds(TIME_TO_LIVE_IN_SECONDS); mapConfig.setBackupCount(3); mapConfig.setAsyncBackupCount(3); config.addMapConfig(mapConfig); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java index 3f81194fe1..c71f68f772 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java @@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.inventory.sync; import java.util.List; +import java.util.concurrent.ConcurrentMap; import java.util.function.Consumer; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -49,25 +50,33 @@ public class ModuleSyncWatchdog { @Value("${data-sync.cache.enabled:false}") private boolean isGlobalDataSyncCacheEnabled; + private final ConcurrentMap<String, Boolean> moduleSyncSemaphoreMap; + /** * Execute Cm Handle poll which changes the cm handle state from 'ADVISED' to 'READY'. */ @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:30000}") public void executeAdvisedCmHandlePoll() { - syncUtils.getAdvisedCmHandles().stream().forEach(advisedCmHandle -> { + syncUtils.getAdvisedCmHandles().forEach(advisedCmHandle -> { final String cmHandleId = advisedCmHandle.getId(); - final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId); - try { - moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle); - moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle); - setCompositeStateToReadyWithInitialDataStoreSyncState().accept(compositeState); - } catch (final Exception e) { - setCompositeStateToLocked().accept(compositeState); - syncUtils.updateLockReasonDetailsAndAttempts(compositeState, - LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage()); + if (hasPushedIntoSemaphoreMap(cmHandleId)) { + log.debug("executing module sync on {}", cmHandleId); + final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId); + try { + moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle); + moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle); + setCompositeStateToReadyWithInitialDataStoreSyncState().accept(compositeState); + updateModuleSyncSemaphoreMap(cmHandleId); + } catch (final Exception e) { + setCompositeStateToLocked().accept(compositeState); + syncUtils.updateLockReasonDetailsAndAttempts(compositeState, + LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage()); + } + inventoryPersistence.saveCmHandleState(cmHandleId, compositeState); + log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name()); + } else { + log.debug("{} already processed by another instance", cmHandleId); } - inventoryPersistence.saveCmHandleState(cmHandleId, compositeState); - log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name()); }); log.debug("No Cm-Handles currently found in an ADVISED state"); } @@ -119,8 +128,15 @@ public class ModuleSyncWatchdog { private CompositeState.Operational getDataStoreSyncState(final boolean dataSyncEnabled) { final DataStoreSyncState dataStoreSyncState = dataSyncEnabled - ? DataStoreSyncState.UNSYNCHRONIZED : DataStoreSyncState.NONE_REQUESTED; + ? DataStoreSyncState.UNSYNCHRONIZED : DataStoreSyncState.NONE_REQUESTED; return CompositeState.Operational.builder().dataStoreSyncState(dataStoreSyncState).build(); } + private void updateModuleSyncSemaphoreMap(final String cmHandleId) { + moduleSyncSemaphoreMap.replace(cmHandleId, true); + } + + private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) { + return moduleSyncSemaphoreMap.putIfAbsent(cmHandleId, false) == null; + } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy index 40a0e39b9b..7455438cc2 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy @@ -30,6 +30,9 @@ import org.onap.cps.ncmp.api.inventory.LockReasonCategory import org.onap.cps.ncmp.api.inventory.CompositeStateBuilder import spock.lang.Specification +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentMap + class ModuleSyncWatchdogSpec extends Specification { def mockInventoryPersistence = Mock(InventoryPersistence) @@ -38,9 +41,11 @@ class ModuleSyncWatchdogSpec extends Specification { def mockModuleSyncService = Mock(ModuleSyncService) + def stubbedMap = Stub(ConcurrentMap) + def cmHandleState = CmHandleState.ADVISED - def objectUnderTest = new ModuleSyncWatchdog(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService) + def objectUnderTest = new ModuleSyncWatchdog(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService, stubbedMap as ConcurrentHashMap) def 'Schedule a Cm-Handle Sync for ADVISED Cm-Handles where #scenario'() { given: 'cm handles in an advised state and a data sync state' diff --git a/cps-ri/src/test/groovy/org/onap/cps/spi/utils/SessionManagerIntegrationSpec.groovy b/cps-ri/src/test/groovy/org/onap/cps/spi/utils/SessionManagerIntegrationSpec.groovy index a1f6d580fd..ceb9dd4cf3 100644 --- a/cps-ri/src/test/groovy/org/onap/cps/spi/utils/SessionManagerIntegrationSpec.groovy +++ b/cps-ri/src/test/groovy/org/onap/cps/spi/utils/SessionManagerIntegrationSpec.groovy @@ -37,7 +37,7 @@ class SessionManagerIntegrationSpec extends CpsPersistenceSpecBase{ CpsSessionFactory cpsSessionFactory def sessionId - def shortTimeoutForTesting = 200L + def shortTimeoutForTesting = 300L def setup(){ sessionId = objectUnderTest.startSession() |