aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkissand <andras.zoltan.kiss@est.tech>2022-07-14 12:37:14 +0200
committerkissand <andras.zoltan.kiss@est.tech>2022-07-27 14:23:19 +0200
commitfe835cb6e0030c00d08f9eb84c3f7bb2b3d90c2e (patch)
treed41f9b5f951e0b40027997479be8964e3122a9ff
parent054873c7c52bdb9fae718a0d7651d57b1a995dfc (diff)
Distributed datastore solution for Module Sync Watchdog
- use semaphore map in ModuleSyncWatchdog - increase test timeout, because it needs more time for hazelcast initialization Issue-ID: CPS-1015 Change-Id: I71feed8fbbd047af9fabba29a5f762a1f17a1c78 Signed-off-by: kissand <andras.zoltan.kiss@est.tech>
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java29
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java42
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy7
-rw-r--r--cps-ri/src/test/groovy/org/onap/cps/spi/utils/SessionManagerIntegrationSpec.groovy2
4 files changed, 56 insertions, 24 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java
index 978c3d16b7..1efe17695e 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java
@@ -23,7 +23,10 @@ package org.onap.cps.ncmp.api.impl.config.embeddedcache;
import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -33,32 +36,40 @@ import org.springframework.context.annotation.Configuration;
@Configuration
public class SynchronizationSemaphoresConfig {
+ private static final int TIME_TO_LIVE_IN_SECONDS = (int) TimeUnit.MINUTES.toSeconds(30);
+
/**
* Module Sync Distributed Map Instance.
- * @return Instance of Map
+ *
+ * @return configured map of module sync semaphore
*/
@Bean
- public Map<String, String> moduleSyncSemaphore() {
- return Hazelcast.newHazelcastInstance(
- initializeDefaultMapConfig("moduleSyncSemaphore", "moduleSyncSemaphoreConfig"))
+ public ConcurrentMap<String, Boolean> moduleSyncSemaphoreMap() {
+ return createHazelcastInstance("moduleSyncSemaphore", "moduleSyncSemaphoreConfig")
.getMap("moduleSyncSemaphore");
}
/**
* Data Sync Distributed Map Instance.
- * @return Instance of Map
+ *
+ * @return configured map of data sync semaphore
*/
@Bean
- public Map<String, String> dataSyncSemaphore() {
- return Hazelcast.newHazelcastInstance(
- initializeDefaultMapConfig("dataSyncSemaphore", "dataSyncSemaphoreConfig"))
+ public Map<String, String> dataSyncSemaphoreMap() {
+ return createHazelcastInstance("dataSyncSemaphore", "dataSyncSemaphoreConfig")
.getMap("dataSyncSemaphore");
}
+ private HazelcastInstance createHazelcastInstance(
+ final String hazelcastInstanceName, final String configMapName) {
+ return Hazelcast.newHazelcastInstance(
+ initializeDefaultMapConfig(hazelcastInstanceName, configMapName));
+ }
+
private Config initializeDefaultMapConfig(final String instanceName, final String configName) {
final Config config = new Config(instanceName);
final MapConfig mapConfig = new MapConfig(configName);
- mapConfig.setTimeToLiveSeconds(30);
+ mapConfig.setTimeToLiveSeconds(TIME_TO_LIVE_IN_SECONDS);
mapConfig.setBackupCount(3);
mapConfig.setAsyncBackupCount(3);
config.addMapConfig(mapConfig);
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
index 3f81194fe1..c71f68f772 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
@@ -22,6 +22,7 @@
package org.onap.cps.ncmp.api.inventory.sync;
import java.util.List;
+import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -49,25 +50,33 @@ public class ModuleSyncWatchdog {
@Value("${data-sync.cache.enabled:false}")
private boolean isGlobalDataSyncCacheEnabled;
+ private final ConcurrentMap<String, Boolean> moduleSyncSemaphoreMap;
+
/**
* Execute Cm Handle poll which changes the cm handle state from 'ADVISED' to 'READY'.
*/
@Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:30000}")
public void executeAdvisedCmHandlePoll() {
- syncUtils.getAdvisedCmHandles().stream().forEach(advisedCmHandle -> {
+ syncUtils.getAdvisedCmHandles().forEach(advisedCmHandle -> {
final String cmHandleId = advisedCmHandle.getId();
- final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
- try {
- moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle);
- moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle);
- setCompositeStateToReadyWithInitialDataStoreSyncState().accept(compositeState);
- } catch (final Exception e) {
- setCompositeStateToLocked().accept(compositeState);
- syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
- LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
+ if (hasPushedIntoSemaphoreMap(cmHandleId)) {
+ log.debug("executing module sync on {}", cmHandleId);
+ final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
+ try {
+ moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle);
+ moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle);
+ setCompositeStateToReadyWithInitialDataStoreSyncState().accept(compositeState);
+ updateModuleSyncSemaphoreMap(cmHandleId);
+ } catch (final Exception e) {
+ setCompositeStateToLocked().accept(compositeState);
+ syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
+ LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
+ }
+ inventoryPersistence.saveCmHandleState(cmHandleId, compositeState);
+ log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
+ } else {
+ log.debug("{} already processed by another instance", cmHandleId);
}
- inventoryPersistence.saveCmHandleState(cmHandleId, compositeState);
- log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
});
log.debug("No Cm-Handles currently found in an ADVISED state");
}
@@ -119,8 +128,15 @@ public class ModuleSyncWatchdog {
private CompositeState.Operational getDataStoreSyncState(final boolean dataSyncEnabled) {
final DataStoreSyncState dataStoreSyncState = dataSyncEnabled
- ? DataStoreSyncState.UNSYNCHRONIZED : DataStoreSyncState.NONE_REQUESTED;
+ ? DataStoreSyncState.UNSYNCHRONIZED : DataStoreSyncState.NONE_REQUESTED;
return CompositeState.Operational.builder().dataStoreSyncState(dataStoreSyncState).build();
}
+ private void updateModuleSyncSemaphoreMap(final String cmHandleId) {
+ moduleSyncSemaphoreMap.replace(cmHandleId, true);
+ }
+
+ private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) {
+ return moduleSyncSemaphoreMap.putIfAbsent(cmHandleId, false) == null;
+ }
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
index 40a0e39b9b..7455438cc2 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
@@ -30,6 +30,9 @@ import org.onap.cps.ncmp.api.inventory.LockReasonCategory
import org.onap.cps.ncmp.api.inventory.CompositeStateBuilder
import spock.lang.Specification
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.ConcurrentMap
+
class ModuleSyncWatchdogSpec extends Specification {
def mockInventoryPersistence = Mock(InventoryPersistence)
@@ -38,9 +41,11 @@ class ModuleSyncWatchdogSpec extends Specification {
def mockModuleSyncService = Mock(ModuleSyncService)
+ def stubbedMap = Stub(ConcurrentMap)
+
def cmHandleState = CmHandleState.ADVISED
- def objectUnderTest = new ModuleSyncWatchdog(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService)
+ def objectUnderTest = new ModuleSyncWatchdog(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService, stubbedMap as ConcurrentHashMap)
def 'Schedule a Cm-Handle Sync for ADVISED Cm-Handles where #scenario'() {
given: 'cm handles in an advised state and a data sync state'
diff --git a/cps-ri/src/test/groovy/org/onap/cps/spi/utils/SessionManagerIntegrationSpec.groovy b/cps-ri/src/test/groovy/org/onap/cps/spi/utils/SessionManagerIntegrationSpec.groovy
index a1f6d580fd..ceb9dd4cf3 100644
--- a/cps-ri/src/test/groovy/org/onap/cps/spi/utils/SessionManagerIntegrationSpec.groovy
+++ b/cps-ri/src/test/groovy/org/onap/cps/spi/utils/SessionManagerIntegrationSpec.groovy
@@ -37,7 +37,7 @@ class SessionManagerIntegrationSpec extends CpsPersistenceSpecBase{
CpsSessionFactory cpsSessionFactory
def sessionId
- def shortTimeoutForTesting = 200L
+ def shortTimeoutForTesting = 300L
def setup(){
sessionId = objectUnderTest.startSession()