diff options
7 files changed, 302 insertions, 25 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java index 978c3d16b7..1efe17695e 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java @@ -23,7 +23,10 @@ package org.onap.cps.ncmp.api.impl.config.embeddedcache; import com.hazelcast.config.Config; import com.hazelcast.config.MapConfig; import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -33,32 +36,40 @@ import org.springframework.context.annotation.Configuration; @Configuration public class SynchronizationSemaphoresConfig { + private static final int TIME_TO_LIVE_IN_SECONDS = (int) TimeUnit.MINUTES.toSeconds(30); + /** * Module Sync Distributed Map Instance. - * @return Instance of Map + * + * @return configured map of module sync semaphore */ @Bean - public Map<String, String> moduleSyncSemaphore() { - return Hazelcast.newHazelcastInstance( - initializeDefaultMapConfig("moduleSyncSemaphore", "moduleSyncSemaphoreConfig")) + public ConcurrentMap<String, Boolean> moduleSyncSemaphoreMap() { + return createHazelcastInstance("moduleSyncSemaphore", "moduleSyncSemaphoreConfig") .getMap("moduleSyncSemaphore"); } /** * Data Sync Distributed Map Instance. - * @return Instance of Map + * + * @return configured map of data sync semaphore */ @Bean - public Map<String, String> dataSyncSemaphore() { - return Hazelcast.newHazelcastInstance( - initializeDefaultMapConfig("dataSyncSemaphore", "dataSyncSemaphoreConfig")) + public Map<String, String> dataSyncSemaphoreMap() { + return createHazelcastInstance("dataSyncSemaphore", "dataSyncSemaphoreConfig") .getMap("dataSyncSemaphore"); } + private HazelcastInstance createHazelcastInstance( + final String hazelcastInstanceName, final String configMapName) { + return Hazelcast.newHazelcastInstance( + initializeDefaultMapConfig(hazelcastInstanceName, configMapName)); + } + private Config initializeDefaultMapConfig(final String instanceName, final String configName) { final Config config = new Config(instanceName); final MapConfig mapConfig = new MapConfig(configName); - mapConfig.setTimeToLiveSeconds(30); + mapConfig.setTimeToLiveSeconds(TIME_TO_LIVE_IN_SECONDS); mapConfig.setBackupCount(3); mapConfig.setAsyncBackupCount(3); config.addMapConfig(mapConfig); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java index 3f81194fe1..c71f68f772 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java @@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.inventory.sync; import java.util.List; +import java.util.concurrent.ConcurrentMap; import java.util.function.Consumer; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -49,25 +50,33 @@ public class ModuleSyncWatchdog { @Value("${data-sync.cache.enabled:false}") private boolean isGlobalDataSyncCacheEnabled; + private final ConcurrentMap<String, Boolean> moduleSyncSemaphoreMap; + /** * Execute Cm Handle poll which changes the cm handle state from 'ADVISED' to 'READY'. */ @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:30000}") public void executeAdvisedCmHandlePoll() { - syncUtils.getAdvisedCmHandles().stream().forEach(advisedCmHandle -> { + syncUtils.getAdvisedCmHandles().forEach(advisedCmHandle -> { final String cmHandleId = advisedCmHandle.getId(); - final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId); - try { - moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle); - moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle); - setCompositeStateToReadyWithInitialDataStoreSyncState().accept(compositeState); - } catch (final Exception e) { - setCompositeStateToLocked().accept(compositeState); - syncUtils.updateLockReasonDetailsAndAttempts(compositeState, - LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage()); + if (hasPushedIntoSemaphoreMap(cmHandleId)) { + log.debug("executing module sync on {}", cmHandleId); + final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId); + try { + moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle); + moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle); + setCompositeStateToReadyWithInitialDataStoreSyncState().accept(compositeState); + updateModuleSyncSemaphoreMap(cmHandleId); + } catch (final Exception e) { + setCompositeStateToLocked().accept(compositeState); + syncUtils.updateLockReasonDetailsAndAttempts(compositeState, + LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage()); + } + inventoryPersistence.saveCmHandleState(cmHandleId, compositeState); + log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name()); + } else { + log.debug("{} already processed by another instance", cmHandleId); } - inventoryPersistence.saveCmHandleState(cmHandleId, compositeState); - log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name()); }); log.debug("No Cm-Handles currently found in an ADVISED state"); } @@ -119,8 +128,15 @@ public class ModuleSyncWatchdog { private CompositeState.Operational getDataStoreSyncState(final boolean dataSyncEnabled) { final DataStoreSyncState dataStoreSyncState = dataSyncEnabled - ? DataStoreSyncState.UNSYNCHRONIZED : DataStoreSyncState.NONE_REQUESTED; + ? DataStoreSyncState.UNSYNCHRONIZED : DataStoreSyncState.NONE_REQUESTED; return CompositeState.Operational.builder().dataStoreSyncState(dataStoreSyncState).build(); } + private void updateModuleSyncSemaphoreMap(final String cmHandleId) { + moduleSyncSemaphoreMap.replace(cmHandleId, true); + } + + private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) { + return moduleSyncSemaphoreMap.putIfAbsent(cmHandleId, false) == null; + } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy index 40a0e39b9b..7455438cc2 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy @@ -30,6 +30,9 @@ import org.onap.cps.ncmp.api.inventory.LockReasonCategory import org.onap.cps.ncmp.api.inventory.CompositeStateBuilder import spock.lang.Specification +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentMap + class ModuleSyncWatchdogSpec extends Specification { def mockInventoryPersistence = Mock(InventoryPersistence) @@ -38,9 +41,11 @@ class ModuleSyncWatchdogSpec extends Specification { def mockModuleSyncService = Mock(ModuleSyncService) + def stubbedMap = Stub(ConcurrentMap) + def cmHandleState = CmHandleState.ADVISED - def objectUnderTest = new ModuleSyncWatchdog(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService) + def objectUnderTest = new ModuleSyncWatchdog(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService, stubbedMap as ConcurrentHashMap) def 'Schedule a Cm-Handle Sync for ADVISED Cm-Handles where #scenario'() { given: 'cm handles in an advised state and a data sync state' diff --git a/cps-ri/src/test/groovy/org/onap/cps/spi/utils/SessionManagerIntegrationSpec.groovy b/cps-ri/src/test/groovy/org/onap/cps/spi/utils/SessionManagerIntegrationSpec.groovy index a1f6d580fd..ceb9dd4cf3 100644 --- a/cps-ri/src/test/groovy/org/onap/cps/spi/utils/SessionManagerIntegrationSpec.groovy +++ b/cps-ri/src/test/groovy/org/onap/cps/spi/utils/SessionManagerIntegrationSpec.groovy @@ -37,7 +37,7 @@ class SessionManagerIntegrationSpec extends CpsPersistenceSpecBase{ CpsSessionFactory cpsSessionFactory def sessionId - def shortTimeoutForTesting = 200L + def shortTimeoutForTesting = 300L def setup(){ sessionId = objectUnderTest.startSession() diff --git a/docs/cps-events.rst b/docs/cps-events.rst new file mode 100644 index 0000000000..a28d4b0529 --- /dev/null +++ b/docs/cps-events.rst @@ -0,0 +1,138 @@ +.. This work is licensed under a Creative Commons Attribution 4.0 International License. +.. http://creativecommons.org/licenses/by/4.0 +.. Copyright (C) 2022 Nordix Foundation + +.. DO NOT CHANGE THIS LABEL FOR RELEASE NOTES - EVEN THOUGH IT GIVES A WARNING +.. _cpsEvents: + +CPS Events +########## + +CPS Core +******** +.. + Cps core events yet to be written + + +CPS-NCMP +******** + +Lifecycle Management (LCM) Event +================================ + + +Overview +-------- +Lifecycle management events are published as cm handle state transitions from one state to another. + + +LCM events and state handler +---------------------------- +The LCM events are triggered under the state handler which has the following responsibilities: + +#. Updating and persisting cm handle state based on the target state of the cm handle + +#. Create and calls to publish the LCM event based on the cm handle state transition that occured + + **3 possible event types:** + + * Create + * Update + * Delete + + + +LCM Event Schema +---------------- +The current published LCM event is based on the following schema: + +:download:`Life cycle management event schema <schemas/lcm-event-schema-v1.json>` + +LCM Event structure +------------------- + +Events header +^^^^^^^^^^^^^ +*Event header prototype for all event types* + +.. code-block:: json + + { + "eventId" : "00001", + "eventCorrelationId : "cmhandle-001", + "eventTime" : "2021-11-16T16:42:25-04:00", + "eventSource" : "org.onap.ncmp", + "eventType" : "org.onap.ncmp.cmhandle-lcm-event.create", + "eventSchema" : "org.onap.ncmp:cmhandle-lcm-event", + "eventSchemaVersion" : "1.0" + "event": .... + } + +Events payload +^^^^^^^^^^^^^^ +Event payload varies based on the type of event. + +**CREATE** + +Event payload for this event contains the properties of the new cm handle created. + +*Create event payload prototype* + +.. code-block:: json + + "event": { + "cmHandleId" : "cmhandle-001", + "newValues" : { + "cmHandleState" : "ADVISED", + "dataSyncEnabled" : "TRUE", + "cmhandleProperties" : [ + "prop1" : "val1", + "prop2" : "val2" + ] + } + } + } + + +**UPDATE** + +Event payload for this event contains the difference in state and properties of the cm handle. + +*Update event payload prototype* + +.. code-block:: json + + "event": { + "cmHandleId" : "cmhandle-001", + "oldValues" : { + "cmHandleState" : "ADVISED", + "dataSyncEnabled" : "FALSE", + "cmhandleProperties" : [ + "prop1" : "val1", + "prop2" : "val2", + } + "newValues" : { + "cmHandleState" : "READY", + "dataSyncEnabled" : "TRUE", + "cmhandleProperties" : [ + "prop1" : "updatedval1", + "prop2" : "updatedval2" + ] + } + } + } + + +**DELETE** + +Event payload for this event contains the identifier of the deleted cm handle. + +*Delete event payload prototype* + +.. code-block:: json + + "event": { + "cmHandleId" : "cmhandle-001", + } + + diff --git a/docs/index.rst b/docs/index.rst index eaf36466f4..df4ea95185 100755 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,6 +1,6 @@ .. This work is licensed under a Creative Commons Attribution 4.0 International License. .. http://creativecommons.org/licenses/by/4.0 -.. Copyright (C) 2021 Nordix Foundation +.. Copyright (C) 2021-2022 Nordix Foundation .. DO NOT CHANGE THIS LABEL FOR RELEASE NOTES - EVEN THOUGH IT GIVES A WARNING .. _master_index: @@ -22,6 +22,7 @@ CPS Core admin-guide.rst design.rst modeling.rst + cps-events.rst deployment.rst release-notes.rst diff --git a/docs/schemas/lcm-event-schema-v1.json b/docs/schemas/lcm-event-schema-v1.json new file mode 100644 index 0000000000..97c0fbee22 --- /dev/null +++ b/docs/schemas/lcm-event-schema-v1.json @@ -0,0 +1,106 @@ +{ + + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "urn:cps:org.onap.ncmp.cmhandle.lcm-event:v1", + + "$ref": "#/definitions/LcmEvent", + + "definitions": { + + "Values": { + "description": "Values that represents the state of a cmHandle", + "type": "object", + "properties": { + "dataSyncEnabled":{ + "description": "Whether data sync enabled", + "type": "boolean" + }, + "cmHandleState": { + "description": "State of cmHandle", + "type": "string", + "enum": ["ADVISED", "READY", "LOCKED", "DELETING", "DELETED"] + }, + "cmHandleProperties": { + "description": "cmHandle properties", + "type": "object", + "default": null, + "existingJavaType": "java.util.List<java.util.Map<String,String>>", + "additionalProperties": false + } + }, + "additionalProperties": false + }, + + "Event": { + "description": "The Payload of an event", + "type": "object", + "properties": { + "cmHandleId": { + "description": "cmHandle id", + "type": "string" + }, + "oldValues": { + "$ref": "#/definitions/Values" + }, + "newValues": { + "$ref": "#/definitions/Values" + } + }, + "required": [ + "cmHandleId" + ], + "additionalProperties": false + }, + + "LcmEvent": { + "description": "The payload for LCM event", + "type": "object", + "javaType" : "org.onap.ncmp.cmhandle.event.lcm.LcmEvent", + "properties": { + "eventId": { + "description": "The unique id identifying the event", + "type": "string" + }, + "eventCorrelationId": { + "description": "The id identifying the event", + "type": "string" + }, + "eventTime": { + "description": "The timestamp when original event occurred", + "type": "string" + }, + "eventSource": { + "description": "The source of the event", + "type": "string" + }, + "eventType": { + "description": "The type of the event", + "type": "string" + }, + "eventSchema": { + "description": "The schema that this event adheres to", + "type": "string" + }, + "eventSchemaVersion": { + "description": "The version of the schema that this event adheres to", + "type": "string" + }, + "event": { + "$ref": "#/definitions/Event" + } + }, + "required": [ + "eventId", + "eventCorrelationId", + "eventTime", + "eventSource", + "eventType", + "eventSchema", + "eventSchemaVersion", + "event" + ], + "additionalProperties": false + } + + } +} |