diff options
24 files changed, 626 insertions, 297 deletions
diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml index 17551344fe..f7fa7356a3 100644 --- a/cps-application/src/main/resources/application.yml +++ b/cps-application/src/main/resources/application.yml @@ -167,7 +167,7 @@ dmi: timers:
advised-modules-sync:
- sleep-time-ms: 30000
+ sleep-time-ms: 5000
locked-modules-sync:
sleep-time-ms: 300000
cm-handle-data-sync:
diff --git a/cps-ncmp-service/lombok.config b/cps-ncmp-service/lombok.config index b60a192069..1fba85bb7b 100644 --- a/cps-ncmp-service/lombok.config +++ b/cps-ncmp-service/lombok.config @@ -18,4 +18,3 @@ config.stopBubbling = true lombok.addLombokGeneratedAnnotation = true -lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Qualifier
\ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java new file mode 100644 index 0000000000..abde4c2d54 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java @@ -0,0 +1,113 @@ +/* + * ============LICENSE_START======================================================== + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.config.embeddedcache; + +import com.hazelcast.config.Config; +import com.hazelcast.config.MapConfig; +import com.hazelcast.config.NamedConfig; +import com.hazelcast.config.QueueConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import org.onap.cps.spi.model.DataNode; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * Core infrastructure of the hazelcast distributed caches for Module Sync and Data Sync use cases. + */ +@Configuration +public class SynchronizationCacheConfig { + + private static final QueueConfig commonQueueConfig = createQueueConfig(); + private static final MapConfig moduleSyncStartedConfig = + createMapConfig("moduleSyncStartedConfig", TimeUnit.MINUTES.toSeconds(1)); + private static final MapConfig dataSyncSemaphoresConfig = + createMapConfig("dataSyncSemaphoresConfig", TimeUnit.MINUTES.toSeconds(30)); + + /** + * Module Sync Distributed Queue Instance. + * + * @return queue of cm handles (data nodes) that need module sync + */ + @Bean + public BlockingQueue<DataNode> moduleSyncWorkQueue() { + return createHazelcastInstance("moduleSyncWorkQueue", commonQueueConfig) + .getQueue("moduleSyncWorkQueue"); + } + + /** + * Module Sync started (and maybe finished) on cm handles (ids). + * + * @return Map of cm handles (ids) and objects (not used really) for which module sync has started or been completed + */ + @Bean + public Map<String, Object> moduleSyncStartedOnCmHandles() { + return createHazelcastInstance("moduleSyncStartedOnCmHandles", moduleSyncStartedConfig) + .getMap("moduleSyncStartedOnCmHandles"); + } + + /** + * Data Sync Distributed Map Instance. + * + * @return configured map of data sync semaphores + */ + @Bean + public Map<String, Boolean> dataSyncSemaphores() { + return createHazelcastInstance("dataSyncSemaphores", dataSyncSemaphoresConfig) + .getMap("dataSyncSemaphores"); + } + + private HazelcastInstance createHazelcastInstance( + final String hazelcastInstanceName, final NamedConfig namedConfig) { + return Hazelcast.newHazelcastInstance(initializeConfig(hazelcastInstanceName, namedConfig)); + } + + private Config initializeConfig(final String instanceName, final NamedConfig namedConfig) { + final Config config = new Config(instanceName); + if (namedConfig instanceof MapConfig) { + config.addMapConfig((MapConfig) namedConfig); + } + if (namedConfig instanceof QueueConfig) { + config.addQueueConfig((QueueConfig) namedConfig); + } + config.setClusterName("synchronization-caches"); + return config; + } + + private static QueueConfig createQueueConfig() { + final QueueConfig commonQueueConfig = new QueueConfig("defaultQueueConfig"); + commonQueueConfig.setBackupCount(3); + commonQueueConfig.setAsyncBackupCount(3); + return commonQueueConfig; + } + + private static MapConfig createMapConfig(final String configName, final long timeToLiveSeconds) { + final MapConfig mapConfig = new MapConfig(configName); + mapConfig.setBackupCount(3); + mapConfig.setAsyncBackupCount(3); + mapConfig.setTimeToLiveSeconds((int) timeToLiveSeconds); + return mapConfig; + } + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java deleted file mode 100644 index 571558ac0c..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * ===========LICENSE_START======================================================== - * Copyright (C) 2022 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.config.embeddedcache; - -import com.hazelcast.config.Config; -import com.hazelcast.config.MapConfig; -import com.hazelcast.core.Hazelcast; -import com.hazelcast.core.HazelcastInstance; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -/** - * Core infrastructure of the hazelcast distributed map for Module Sync and Data Sync use cases. - */ -@Configuration -public class SynchronizationSemaphoresConfig { - - private static final int TIME_TO_LIVE_IN_SECONDS = (int) TimeUnit.MINUTES.toSeconds(30); - - /** - * Module Sync Distributed Map Instance. - * - * @return configured map of module sync semaphores - */ - @Bean - public ConcurrentMap<String, Boolean> moduleSyncSemaphores() { - return createHazelcastInstance("moduleSyncSemaphores", "moduleSyncSemaphoresConfig") - .getMap("moduleSyncSemaphores"); - } - - /** - * Data Sync Distributed Map Instance. - * - * @return configured map of data sync semaphores - */ - @Bean - public ConcurrentMap<String, Boolean> dataSyncSemaphores() { - return createHazelcastInstance("dataSyncSemaphores", "dataSyncSemaphoresConfig") - .getMap("dataSyncSemaphores"); - } - - private HazelcastInstance createHazelcastInstance( - final String hazelcastInstanceName, final String configMapName) { - return Hazelcast.newHazelcastInstance( - initializeDefaultMapConfig(hazelcastInstanceName, configMapName)); - } - - private Config initializeDefaultMapConfig(final String instanceName, final String configName) { - final Config config = new Config(instanceName); - final MapConfig mapConfig = new MapConfig(configName); - mapConfig.setTimeToLiveSeconds(TIME_TO_LIVE_IN_SECONDS); - mapConfig.setBackupCount(3); - mapConfig.setAsyncBackupCount(3); - config.addMapConfig(mapConfig); - config.setClusterName("synchronization-semaphores"); - return config; - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/InventoryPersistence.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/InventoryPersistence.java index c059ece0d3..7a7ef66668 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/InventoryPersistence.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/InventoryPersistence.java @@ -27,8 +27,10 @@ import static org.onap.cps.spi.CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED; import static org.onap.cps.spi.FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS; import java.time.OffsetDateTime; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -157,6 +159,19 @@ public class InventoryPersistence { } /** + * Method to save batch of cm handles. + * + * @param yangModelCmHandles cm handle represented as Yang Models + */ + public void saveCmHandleBatch(final Collection<YangModelCmHandle> yangModelCmHandles) { + final List<String> cmHandlesJsonData = new ArrayList<>(); + yangModelCmHandles.forEach(yangModelCmHandle -> cmHandlesJsonData.add( + String.format("{\"cm-handles\":[%s]}", jsonObjectMapper.asJsonString(yangModelCmHandle)))); + cpsDataService.saveListElementsBatch(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR, + NCMP_DMI_REGISTRY_PARENT, cmHandlesJsonData, NO_TIMESTAMP); + } + + /** * Method to delete a list or a list element. * * @param listElementXpath list element xPath diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java index 45ba078044..107f8a04bb 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java @@ -21,7 +21,7 @@ package org.onap.cps.ncmp.api.inventory.sync; import java.time.OffsetDateTime; -import java.util.concurrent.ConcurrentMap; +import java.util.Map; import java.util.function.Consumer; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -29,7 +29,6 @@ import org.onap.cps.api.CpsDataService; import org.onap.cps.ncmp.api.inventory.CompositeState; import org.onap.cps.ncmp.api.inventory.DataStoreSyncState; import org.onap.cps.ncmp.api.inventory.InventoryPersistence; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @@ -47,8 +46,7 @@ public class DataSyncWatchdog { private final SyncUtils syncUtils; - @Qualifier("dataSyncSemaphores") - private final ConcurrentMap<String, Boolean> dataSyncSemaphores; + private final Map<String, Boolean> dataSyncSemaphores; /** * Execute Cm Handle poll which queries the cm handle state in 'READY' and Operational Datastore Sync State in diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java index c574aa61d9..7f61c476d5 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java @@ -86,19 +86,17 @@ public class ModuleSyncService { } /** - * Deletes the SchemaSet for provided cmHandle if the SchemaSet Exists. + * Deletes the SchemaSet for schema set id if the SchemaSet Exists. * - * @param yangModelCmHandle the yang model of cm handle. + * @param schemaSetId the schema set id to be deleted */ - public void deleteSchemaSetIfExists(final YangModelCmHandle yangModelCmHandle) { - final String schemaSetAndAnchorName = yangModelCmHandle.getId(); + public void deleteSchemaSetIfExists(final String schemaSetId) { try { - cpsModuleService.deleteSchemaSet(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, schemaSetAndAnchorName, + cpsModuleService.deleteSchemaSet(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, schemaSetId, CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED); - log.debug("SchemaSet for {} has been deleted. Ready to be recreated.", schemaSetAndAnchorName); + log.debug("SchemaSet for {} has been deleted. Ready to be recreated.", schemaSetId); } catch (final SchemaSetNotFoundException e) { - log.debug("No SchemaSet for {}. Assuming CmHandle has not been previously Module Synced.", - schemaSetAndAnchorName); + log.debug("No SchemaSet for {}. Assuming CmHandle has not been previously Module Synced.", schemaSetId); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java new file mode 100644 index 0000000000..5e26650eda --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java @@ -0,0 +1,113 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.inventory.sync; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler; +import org.onap.cps.ncmp.api.impl.utils.YangDataConverter; +import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; +import org.onap.cps.ncmp.api.inventory.CmHandleState; +import org.onap.cps.ncmp.api.inventory.CompositeState; +import org.onap.cps.ncmp.api.inventory.InventoryPersistence; +import org.onap.cps.ncmp.api.inventory.LockReasonCategory; +import org.onap.cps.spi.model.DataNode; +import org.springframework.stereotype.Component; + +@RequiredArgsConstructor +@Component +@Slf4j +public class ModuleSyncTasks { + private final InventoryPersistence inventoryPersistence; + private final SyncUtils syncUtils; + private final ModuleSyncService moduleSyncService; + private final LcmEventsCmHandleStateHandler lcmEventsCmHandleStateHandler; + + private static final CompletableFuture<Void> COMPLETED_FUTURE = CompletableFuture.completedFuture(null); + + /** + * Perform module sync on a batch of cm handles. + * + * @param cmHandlesAsDataNodes a batch of Data nodes representing cm handles to perform module sync on + * @return completed future to handle post-processing + */ + public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes) { + final Map<YangModelCmHandle, CmHandleState> cmHandelStatePerCmHandle = new HashMap<>(); + for (final DataNode cmHandleAsDataNode : cmHandlesAsDataNodes) { + final String cmHandleId = String.valueOf(cmHandleAsDataNode.getLeaves().get("id")); + final YangModelCmHandle yangModelCmHandle = + YangDataConverter.convertCmHandleToYangModel(cmHandleAsDataNode, cmHandleId); + final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId); + try { + moduleSyncService.deleteSchemaSetIfExists(cmHandleId); + moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle); + cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY); + } catch (final Exception e) { + syncUtils.updateLockReasonDetailsAndAttempts(compositeState, + LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage()); + setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason()); + cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.LOCKED); + } + log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name()); + } + updateCmHandlesStateBatch(cmHandelStatePerCmHandle); + return COMPLETED_FUTURE; + } + + /** + * Reset state to "ADVISED" for any previously failed cm handles. + * + * @param failedCmHandles previously failed (locked) cm handles + * @return completed future to handle post-processing + */ + public CompletableFuture<Void> resetFailedCmHandles(final List<YangModelCmHandle> failedCmHandles) { + final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(failedCmHandles.size()); + for (final YangModelCmHandle failedCmHandle : failedCmHandles) { + final CompositeState compositeState = failedCmHandle.getCompositeState(); + final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState); + if (isReadyForRetry) { + log.debug("Reset cm handle {} state to ADVISED to be re-attempted by module-sync watchdog", + failedCmHandle.getId()); + cmHandleStatePerCmHandle.put(failedCmHandle, CmHandleState.ADVISED); + } + } + updateCmHandlesStateBatch(cmHandleStatePerCmHandle); + return COMPLETED_FUTURE; + } + + private void setCmHandleStateLocked(final YangModelCmHandle advisedCmHandle, + final CompositeState.LockReason lockReason) { + advisedCmHandle.getCompositeState().setLockReason(lockReason); + } + + private void updateCmHandlesStateBatch(final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle) { + // To be refactored as part of CPS-1231; Use state-save-batch capability (depends sub-task12, 13) + for (final Map.Entry<YangModelCmHandle, CmHandleState> entry : cmHandleStatePerCmHandle.entrySet()) { + lcmEventsCmHandleStateHandler.updateCmHandleState(entry.getKey(), entry.getValue()); + } + } + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java index be811a1147..8074fe6fe1 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java @@ -21,17 +21,16 @@ package org.onap.cps.ncmp.api.inventory.sync; +import java.util.Collection; +import java.util.HashSet; import java.util.List; -import java.util.concurrent.ConcurrentMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; -import org.onap.cps.ncmp.api.inventory.CmHandleState; -import org.onap.cps.ncmp.api.inventory.CompositeState; -import org.onap.cps.ncmp.api.inventory.InventoryPersistence; -import org.onap.cps.ncmp.api.inventory.LockReasonCategory; -import org.springframework.beans.factory.annotation.Qualifier; +import org.onap.cps.spi.model.DataNode; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -40,75 +39,75 @@ import org.springframework.stereotype.Component; @Component public class ModuleSyncWatchdog { - private static final boolean MODEL_SYNC_IN_PROGRESS = false; - private static final boolean MODEL_SYNC_DONE = true; - - private final InventoryPersistence inventoryPersistence; - private final SyncUtils syncUtils; + private final BlockingQueue<DataNode> moduleSyncWorkQueue; + private final Map<String, Object> moduleSyncStartedOnCmHandles; + private final ModuleSyncTasks moduleSyncTasks; - private final ModuleSyncService moduleSyncService; - - @Qualifier("moduleSyncSemaphores") - private final ConcurrentMap<String, Boolean> moduleSyncSemaphores; - - private final LcmEventsCmHandleStateHandler lcmEventsCmHandleStateHandler; + private static final int MODULE_SYNC_BATCH_SIZE = 100; + private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10; + private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started"; /** * Execute Cm Handle poll which changes the cm handle state from 'ADVISED' to 'READY'. */ - @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:30000}") - public void executeAdvisedCmHandlePoll() { - syncUtils.getAdvisedCmHandles().forEach(advisedCmHandle -> { - final String cmHandleId = advisedCmHandle.getId(); - if (hasPushedIntoSemaphoreMap(cmHandleId)) { - log.debug("executing module sync on {}", cmHandleId); - final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId); - try { - moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle); - moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle); - lcmEventsCmHandleStateHandler.updateCmHandleState(advisedCmHandle, CmHandleState.READY); - updateModuleSyncSemaphoreMap(cmHandleId); - } catch (final Exception e) { - syncUtils.updateLockReasonDetailsAndAttempts(compositeState, - LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage()); - setCmHandleStateLocked(advisedCmHandle, compositeState.getLockReason()); - } - log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name()); - } else { - log.debug("{} already processed by another instance", cmHandleId); - } - }); - log.debug("No Cm-Handles currently found in an ADVISED state"); + @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:5000}") + public void moduleSyncAdvisedCmHandles() { + populateWorkQueueIfNeeded(); + while (!moduleSyncWorkQueue.isEmpty()) { + final Collection<DataNode> nextBatch = prepareNextBatch(); + moduleSyncTasks.performModuleSync(nextBatch); + preventBusyWait(); + } } /** - * Execute Cm Handle poll which changes the cm handle state from 'LOCKED' to 'ADVISED'. + * Find any failed (locked) cm handles and change state back to 'ADVISED'. */ @Scheduled(fixedDelayString = "${timers.locked-modules-sync.sleep-time-ms:300000}") - public void executeLockedCmHandlePoll() { - final List<YangModelCmHandle> lockedCmHandles = syncUtils.getModuleSyncFailedCmHandles(); - for (final YangModelCmHandle lockedCmHandle : lockedCmHandles) { - final CompositeState compositeState = lockedCmHandle.getCompositeState(); - final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState); - if (isReadyForRetry) { - log.debug("Reset cm handle {} state to ADVISED to re-attempt module-sync", lockedCmHandle.getId()); - lcmEventsCmHandleStateHandler.updateCmHandleState(lockedCmHandle, CmHandleState.ADVISED); - } - } + public void resetPreviouslyFailedCmHandles() { + final List<YangModelCmHandle> failedCmHandles = syncUtils.getModuleSyncFailedCmHandles(); + moduleSyncTasks.resetFailedCmHandles(failedCmHandles); } - private void setCmHandleStateLocked(final YangModelCmHandle advisedCmHandle, - final CompositeState.LockReason lockReason) { - advisedCmHandle.getCompositeState().setLockReason(lockReason); - lcmEventsCmHandleStateHandler.updateCmHandleState(advisedCmHandle, CmHandleState.LOCKED); + private void preventBusyWait() { + // This method isn't really needed until CPS-1200 Performance Improvement: Watchdog Parallel execution + // but leaving here to minimize impacts on this class for that Jira + try { + TimeUnit.MILLISECONDS.sleep(PREVENT_CPU_BURN_WAIT_TIME_MILLIS); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } } - private void updateModuleSyncSemaphoreMap(final String cmHandleId) { - moduleSyncSemaphores.replace(cmHandleId, MODEL_SYNC_DONE); + private void populateWorkQueueIfNeeded() { + if (moduleSyncWorkQueue.isEmpty()) { + final List<DataNode> advisedCmHandles = syncUtils.getAdvisedCmHandles(); + for (final DataNode advisedCmHandle : advisedCmHandles) { + if (!moduleSyncWorkQueue.offer(advisedCmHandle)) { + log.warn("Unable to add cm handle {} to the work queue", advisedCmHandle.getLeaves().get("id")); + } + } + } } - private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) { - return moduleSyncSemaphores.putIfAbsent(cmHandleId, MODEL_SYNC_IN_PROGRESS) == null; + private Collection<DataNode> prepareNextBatch() { + final Collection<DataNode> nextBatchCandidates = new HashSet<>(MODULE_SYNC_BATCH_SIZE); + final Collection<DataNode> nextBatch = new HashSet<>(MODULE_SYNC_BATCH_SIZE); + moduleSyncWorkQueue.drainTo(nextBatchCandidates, MODULE_SYNC_BATCH_SIZE); + log.debug("nextBatchCandidates size : {}", nextBatchCandidates.size()); + for (final DataNode batchCandidate : nextBatchCandidates) { + final String cmHandleId = String.valueOf(batchCandidate.getLeaves().get("id")); + final boolean alreadyAddedToInProgressMap = VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP + .equals(moduleSyncStartedOnCmHandles.putIfAbsent(cmHandleId, VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP)); + if (alreadyAddedToInProgressMap) { + log.debug("module sync for {} already in progress by other instance", cmHandleId); + } else { + nextBatch.add(batchCandidate); + } + } + log.debug("nextBatch size : {}", nextBatch.size()); + return nextBatch; } + } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java index 16fb8f44d8..537f50122c 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java @@ -64,19 +64,14 @@ public class SyncUtils { private static final Pattern retryAttemptPattern = Pattern.compile("^Attempt #(\\d+) failed:"); /** - * Query data nodes for cm handles with an "ADVISED" cm handle state, and select a random entry for processing. + * Query data nodes for cm handles with an "ADVISED" cm handle state. * - * @return a randomized yang model cm handle list with ADVISED state, return empty list if not found + * @return cm handles (data nodes) in ADVISED state (empty list if none found) */ - public List<YangModelCmHandle> getAdvisedCmHandles() { - final List<DataNode> advisedCmHandlesAsDataNodeList = new ArrayList<>( - cmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED)); - log.debug("Total number of fetched advised cm handle(s) is (are) {}", advisedCmHandlesAsDataNodeList.size()); - if (advisedCmHandlesAsDataNodeList.isEmpty()) { - return Collections.emptyList(); - } - Collections.shuffle(advisedCmHandlesAsDataNodeList); - return convertCmHandlesDataNodesToYangModelCmHandles(advisedCmHandlesAsDataNodeList); + public List<DataNode> getAdvisedCmHandles() { + final List<DataNode> advisedCmHandlesAsDataNodes = cmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED); + log.debug("Total number of fetched advised cm handle(s) is (are) {}", advisedCmHandlesAsDataNodes.size()); + return advisedCmHandlesAsDataNodes; } /** diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy index ea84b440f3..80aa81b047 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfigSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy @@ -17,33 +17,38 @@ * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ - package org.onap.cps.ncmp.api.impl.config.embeddedcache - import com.hazelcast.core.Hazelcast +import org.onap.cps.spi.model.DataNode import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.test.context.ContextConfiguration import spock.lang.Specification +import java.util.concurrent.BlockingQueue @SpringBootTest -@ContextConfiguration(classes = [SynchronizationSemaphoresConfig]) -class SynchronizationSemaphoresConfigSpec extends Specification { +@ContextConfiguration(classes = [SynchronizationCacheConfig]) +class SynchronizationCacheConfigSpec extends Specification { + + @Autowired + private BlockingQueue<DataNode> moduleSyncWorkQueue @Autowired - private Map<String, Boolean> moduleSyncSemaphores; + private Map<String, Object> moduleSyncStartedOnCmHandles @Autowired - private Map<String, Boolean> dataSyncSemaphores; + private Map<String, Boolean> dataSyncSemaphores - def 'Embedded Sync Semaphores'() { - expect: 'system is able to create an instance of ModuleSyncSemaphores' - assert null != moduleSyncSemaphores - and: 'system is able to create an instance of DataSyncSemaphores' + def 'Embedded (hazelcast) Caches for Module and Data Sync.'() { + expect: 'system is able to create an instance of the Module Sync Work Queue' + assert null != moduleSyncWorkQueue + and: 'system is able to create an instance of a map to hold cm handles which have started (and maybe finished) module sync' + assert null != moduleSyncStartedOnCmHandles + and: 'system is able to create an instance of a map to hold data sync semaphores' assert null != dataSyncSemaphores - and: 'we have 2 instances' - assert Hazelcast.allHazelcastInstances.size() == 2 - and: 'the names match' - assert Hazelcast.allHazelcastInstances.name == ['moduleSyncSemaphores', 'dataSyncSemaphores'] + and: 'there 3 instances' + assert Hazelcast.allHazelcastInstances.size() == 3 + and: 'they have the correct names (in any order)' + assert Hazelcast.allHazelcastInstances.name.containsAll('moduleSyncWorkQueue', 'moduleSyncStartedOnCmHandles', 'dataSyncSemaphores' ) } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/InventoryPersistenceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/InventoryPersistenceSpec.groovy index 7ffec1ab09..76f10de831 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/InventoryPersistenceSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/InventoryPersistenceSpec.groovy @@ -202,6 +202,22 @@ class InventoryPersistenceSpec extends Specification { } } + def 'Save Multiple Cmhandles'() { + given: 'cm handles represented as Yang Model' + def yangModelCmHandle1 = new YangModelCmHandle(id: 'cmhandle1') + def yangModelCmHandle2 = new YangModelCmHandle(id: 'cmhandle2') + when: 'the cm handles are saved' + objectUnderTest.saveCmHandleBatch([yangModelCmHandle1, yangModelCmHandle2]) + then: 'CPS Data Service persists both cm handles as a batch' + 1 * mockCpsDataService.saveListElementsBatch('NCMP-Admin','ncmp-dmi-registry','/dmi-registry',_,null) >> { + args -> { + def jsonDataList = (args[3] as List) + (jsonDataList[0] as String).contains('cmhandle1') + (jsonDataList[0] as String).contains('cmhandle2') + } + } + } + def 'Delete list or list elements'() { when: 'the method to delete list or list elements is called' objectUnderTest.deleteListOrListElement('sample xPath') diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncServiceSpec.groovy index 6a2fbe8e7b..78da7eb747 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncServiceSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncServiceSpec.groovy @@ -24,8 +24,6 @@ import org.onap.cps.api.CpsAdminService import org.onap.cps.api.CpsModuleService import org.onap.cps.ncmp.api.impl.operations.DmiModelOperations import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle -import org.onap.cps.ncmp.api.inventory.CmHandleState -import org.onap.cps.ncmp.api.inventory.CompositeState import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle import org.onap.cps.spi.CascadeDeleteAllowed import org.onap.cps.spi.exceptions.SchemaSetNotFoundException @@ -34,7 +32,6 @@ import spock.lang.Specification class ModuleSyncServiceSpec extends Specification { - def mockCpsModuleService = Mock(CpsModuleService) def mockDmiModelOperations = Mock(DmiModelOperations) def mockCpsAdminService = Mock(CpsAdminService) @@ -72,38 +69,27 @@ class ModuleSyncServiceSpec extends Specification { } def 'Delete Schema Set for CmHandle' () { - given: 'a CmHandle in the advised state' - def cmHandle = new YangModelCmHandle(id: 'some-cmhandle-id', compositeState: new CompositeState(cmHandleState: CmHandleState.ADVISED)) - and: 'the Schema Set exists for the CmHandle' - 1 * mockCpsModuleService.deleteSchemaSet(_ as String, 'some-cmhandle-id', - CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED) when: 'delete schema set if exists is called' - objectUnderTest.deleteSchemaSetIfExists(cmHandle) - then: 'there are no exceptions' - noExceptionThrown() + objectUnderTest.deleteSchemaSetIfExists('some-cmhandle-id') + then: 'the module service is invoked to delete the correct schema set' + 1 * mockCpsModuleService.deleteSchemaSet(expectedDataspaceName, 'some-cmhandle-id', CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED) } def 'Delete a non-existing Schema Set for CmHandle' () { - given: 'a CmHandle in the advised state' - def cmHandle = new YangModelCmHandle(id: 'some-cmhandle-id', compositeState: new CompositeState(cmHandleState: CmHandleState.ADVISED)) - and: 'the DB throws an exception because its Schema Set does not exist' - 1 * mockCpsModuleService.deleteSchemaSet(_ as String, 'some-cmhandle-id', - CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED) >> { throw new SchemaSetNotFoundException('some-dataspace-name', 'some-cmhandle-id') } + given: 'the DB throws an exception because its Schema Set does not exist' + mockCpsModuleService.deleteSchemaSet(*_) >> { throw new SchemaSetNotFoundException('some-dataspace-name', 'some-cmhandle-id') } when: 'delete schema set if exists is called' - objectUnderTest.deleteSchemaSetIfExists(cmHandle) - then: 'there are no exceptions' + objectUnderTest.deleteSchemaSetIfExists('some-cmhandle-id') + then: 'the exception from the DB is ignored; there are no exceptions' noExceptionThrown() } def 'Delete Schema Set for CmHandle with other exception' () { - given: 'a CmHandle in the advised state' - def cmHandle = new YangModelCmHandle(id: 'some-cmhandle-id', compositeState: new CompositeState(cmHandleState: CmHandleState.ADVISED)) - and: 'an exception other than SchemaSetNotFoundException is thrown' + given: 'an exception other than SchemaSetNotFoundException is thrown' UnsupportedOperationException unsupportedOperationException = new UnsupportedOperationException(); - 1 * mockCpsModuleService.deleteSchemaSet(_ as String, 'some-cmhandle-id', - CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED) >> { throw unsupportedOperationException } + 1 * mockCpsModuleService.deleteSchemaSet(*_) >> { throw unsupportedOperationException } when: 'delete schema set if exists is called' - objectUnderTest.deleteSchemaSetIfExists(cmHandle) + objectUnderTest.deleteSchemaSetIfExists('some-cmhandle-id') then: 'an exception is thrown' def result = thrown(UnsupportedOperationException) result == unsupportedOperationException diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy new file mode 100644 index 0000000000..291ba968ff --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy @@ -0,0 +1,110 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * Modifications Copyright (C) 2022 Bell Canada + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.inventory.sync + +import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler +import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle +import org.onap.cps.ncmp.api.inventory.CmHandleState +import org.onap.cps.ncmp.api.inventory.CompositeState +import org.onap.cps.ncmp.api.inventory.CompositeStateBuilder +import org.onap.cps.ncmp.api.inventory.InventoryPersistence +import org.onap.cps.ncmp.api.inventory.LockReasonCategory +import org.onap.cps.spi.model.DataNode +import spock.lang.Specification + +class ModuleSyncTasksSpec extends Specification { + + def mockInventoryPersistence = Mock(InventoryPersistence) + + def mockSyncUtils = Mock(SyncUtils) + + def mockModuleSyncService = Mock(ModuleSyncService) + + def mockLcmEventsCmHandleStateHandler = Mock(LcmEventsCmHandleStateHandler) + + def objectUnderTest = new ModuleSyncTasks(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService, mockLcmEventsCmHandleStateHandler) + + def 'Module Sync ADVISED cm handles.'() { + given: 'cm handles in an ADVISED state' + def cmHandle1 = advisedCmHandleAsDataNode('cm-handle-1') + def cmHandle2 = advisedCmHandleAsDataNode('cm-handle-2') + and: 'the inventory persistence cm handle returns a ADVISED state for the any handle' + mockInventoryPersistence.getCmHandleState(_) >> new CompositeState(cmHandleState: CmHandleState.ADVISED) + when: 'module sync poll is executed' + objectUnderTest.performModuleSync([cmHandle1, cmHandle2]) + then: 'module sync service deletes schemas set of each cm handle if it already exists' + 1 * mockModuleSyncService.deleteSchemaSetIfExists('cm-handle-1') + 1 * mockModuleSyncService.deleteSchemaSetIfExists('cm-handle-2') + and: 'module sync service is invoked for each cm handle' + 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args,'cm-handle-1') } + 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args,'cm-handle-2') } + and: 'the state handler is called for the both cm handles' + 2 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.READY) + } + + def 'Module Sync ADVISED cm handle with failure during sync.'() { + given: 'a cm handle in an ADVISED state' + def cmHandle = advisedCmHandleAsDataNode('cm-handle') + and: 'the inventory persistence cm handle returns a ADVISED state for the cm handle' + def cmHandleState = new CompositeState(cmHandleState: CmHandleState.ADVISED) + 1 * mockInventoryPersistence.getCmHandleState('cm-handle') >> cmHandleState + and: 'module sync service attempts to sync the cm handle and throws an exception' + 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(*_) >> { throw new Exception('some exception') } + when: 'module sync is executed' + objectUnderTest.performModuleSync([cmHandle]) + then: 'update lock reason, details and attempts is invoked' + 1 * mockSyncUtils.updateLockReasonDetailsAndAttempts(cmHandleState, LockReasonCategory.LOCKED_MODULE_SYNC_FAILED ,'some exception') + and: 'the state handler is called to update the state to LOCKED' + 1 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.LOCKED) + } + + def 'Reset failed CM Handles #scenario.'() { + given: 'cm handles in an locked state' + def lockedState = new CompositeStateBuilder().withCmHandleState(CmHandleState.LOCKED) + .withLockReason(LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, '').withLastUpdatedTimeNow().build() + def yangModelCmHandle1 = new YangModelCmHandle(id: 'cm-handle-1', compositeState: lockedState) + def yangModelCmHandle2 = new YangModelCmHandle(id: 'cm-handle-2', compositeState: lockedState) + and: 'sync utils retry locked cm handle returns #isReadyForRetry' + mockSyncUtils.isReadyForRetry(lockedState) >>> isReadyForRetry + when: 'resetting failed cm handles' + objectUnderTest.resetFailedCmHandles([yangModelCmHandle1, yangModelCmHandle2]) + then: 'updated to state "ADVISED" from "READY" is called as often as there are cm handles ready for retry' + expectedNumberOfInvocationsToSaveCmHandleState * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.ADVISED) + where: + scenario | isReadyForRetry || expectedNumberOfInvocationsToSaveCmHandleState + 'retry locked cm handle once' | [true, false] || 1 + 'retry locked cm handle twice' | [true, true] || 2 + 'do not retry locked cm handle' | [false, false] || 0 + } + + def advisedCmHandleAsDataNode(cmHandleId) { + return new DataNode(anchorName:cmHandleId, leaves:['id':cmHandleId, 'cm-handle-state':'ADVISED']) + } + + def assertYamgModelCmHandleArgument(args, expectedCmHandleId) { + { + def yangModelCmHandle = args[0] + assert yangModelCmHandle.id == expectedCmHandleId + } + return true + } +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy index 81268cbc0b..43f492dbd7 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy @@ -21,98 +21,56 @@ package org.onap.cps.ncmp.api.inventory.sync - -import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle -import org.onap.cps.ncmp.api.inventory.CmHandleState -import org.onap.cps.ncmp.api.inventory.CompositeState -import org.onap.cps.ncmp.api.inventory.InventoryPersistence -import org.onap.cps.ncmp.api.inventory.LockReasonCategory -import org.onap.cps.ncmp.api.inventory.CompositeStateBuilder -import spock.lang.Specification -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.ConcurrentMap +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import org.onap.cps.spi.model.DataNode +import spock.lang.Specification class ModuleSyncWatchdogSpec extends Specification { - def mockInventoryPersistence = Mock(InventoryPersistence) - def mockSyncUtils = Mock(SyncUtils) - def mockModuleSyncService = Mock(ModuleSyncService) + def static testQueueCapacity = 50 + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE - def stubbedMap = Stub(ConcurrentMap) + BlockingQueue<DataNode> moduleSyncWorkQueue = new ArrayBlockingQueue(testQueueCapacity) - def mockLcmEventsCmHandleStateHandler = Mock(LcmEventsCmHandleStateHandler) + def moduleSyncStartedOnCmHandles = [:] - def cmHandleState = CmHandleState.ADVISED + def mockModuleSyncTasks = Mock(ModuleSyncTasks) - def objectUnderTest = new ModuleSyncWatchdog(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService, stubbedMap as ConcurrentHashMap, mockLcmEventsCmHandleStateHandler) + def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , moduleSyncStartedOnCmHandles, mockModuleSyncTasks) - def 'Schedule a Cm-Handle Sync for ADVISED Cm-Handles'() { - given: 'cm handles in an advised state and a data sync state' - def compositeState1 = new CompositeState(cmHandleState: cmHandleState) - def compositeState2 = new CompositeState(cmHandleState: cmHandleState) - def yangModelCmHandle1 = new YangModelCmHandle(id: 'some-cm-handle', compositeState: compositeState1) - def yangModelCmHandle2 = new YangModelCmHandle(id: 'some-cm-handle-2', compositeState: compositeState2) - and: 'sync utilities return a cm handle twice' - mockSyncUtils.getAdvisedCmHandles() >> [yangModelCmHandle1, yangModelCmHandle2] - when: 'module sync poll is executed' - objectUnderTest.executeAdvisedCmHandlePoll() - then: 'the inventory persistence cm handle returns a composite state for the first cm handle' - 1 * mockInventoryPersistence.getCmHandleState('some-cm-handle') >> compositeState1 - and: 'module sync service deletes schema set of cm handle if it exists' - 1 * mockModuleSyncService.deleteSchemaSetIfExists(yangModelCmHandle1) - and: 'module sync service syncs the first cm handle and creates a schema set' - 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle1) - then: 'the state handler is called for the first cm handle' - 1 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(yangModelCmHandle1, CmHandleState.READY) - and: 'the inventory persistence cm handle returns a composite state for the second cm handle' - mockInventoryPersistence.getCmHandleState('some-cm-handle-2') >> compositeState2 - and: 'module sync service syncs the second cm handle and creates a schema set' - 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle2) - then: 'the state handler is called for the second cm handle' - 1 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(yangModelCmHandle2, CmHandleState.READY) + def 'Module sync #scenario , #numberOfAdvisedCmHandles advised cm handles.'() { + given: 'sync utilities returns #numberOfAdvisedCmHandles advised cm handles' + mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(numberOfAdvisedCmHandles) + when: ' module sync is started' + objectUnderTest.moduleSyncAdvisedCmHandles() + then: 'it performs #expectedNumberOfTaskExecutions tasks' + expectedNumberOfTaskExecutions * mockModuleSyncTasks.performModuleSync(_) + where: + scenario | numberOfAdvisedCmHandles || expectedNumberOfTaskExecutions + 'less then 1 batch' | 1 || 1 + 'exactly 1 batch' | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 1 + '2 batches' | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 2 + 'queue capacity' | testQueueCapacity || 3 + 'over queue capacity' | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3 } - def 'Schedule a Cm-Handle Sync for ADVISED Cm-Handle with failure'() { - given: 'cm handles in an advised state' - def compositeState = new CompositeState(cmHandleState: cmHandleState) - def yangModelCmHandle = new YangModelCmHandle(id: 'some-cm-handle', compositeState: compositeState) - and: 'sync utilities return a cm handle' - mockSyncUtils.getAdvisedCmHandles() >> [yangModelCmHandle] - when: 'module sync poll is executed' - objectUnderTest.executeAdvisedCmHandlePoll() - then: 'the inventory persistence cm handle returns a composite state for the cm handle' - 1 * mockInventoryPersistence.getCmHandleState('some-cm-handle') >> compositeState - and: 'module sync service attempts to sync the cm handle and throws an exception' - 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(*_) >> { throw new Exception('some exception') } - and: 'update lock reason, details and attempts is invoked' - 1 * mockSyncUtils.updateLockReasonDetailsAndAttempts(compositeState, LockReasonCategory.LOCKED_MODULE_SYNC_FAILED ,'some exception') - and: 'the state handler is called to update the state to LOCKED' - 1 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(yangModelCmHandle, CmHandleState.LOCKED) + def 'Reset failed cm handles.'() { + given: 'sync utilities returns failed cm handles' + def failedCmHandles = [new YangModelCmHandle()] + mockSyncUtils.getModuleSyncFailedCmHandles() >> failedCmHandles + when: ' reset failed cm handles is started' + objectUnderTest.resetPreviouslyFailedCmHandles() + then: 'it is delegated to the module sync task (service)' + 1 * mockModuleSyncTasks.resetFailedCmHandles(failedCmHandles) } - def 'Schedule a Cm-Handle Sync with condition #scenario '() { - given: 'cm handles in an locked state' - def compositeState = new CompositeStateBuilder().withCmHandleState(CmHandleState.LOCKED) - .withLockReason(LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, '').withLastUpdatedTimeNow().build() - def yangModelCmHandle = new YangModelCmHandle(id: 'some-cm-handle', compositeState: compositeState) - and: 'sync utilities return a cm handle twice' - mockSyncUtils.getModuleSyncFailedCmHandles() >> [yangModelCmHandle, yangModelCmHandle] - and: 'inventory persistence returns the composite state of the cm handle' - mockInventoryPersistence.getCmHandleState(yangModelCmHandle.getId()) >> compositeState - and: 'sync utils retry locked cm handle returns #isReadyForRetry' - mockSyncUtils.isReadyForRetry(compositeState) >>> isReadyForRetry - when: 'module sync poll is executed' - objectUnderTest.executeLockedCmHandlePoll() - then: 'the first cm handle is updated to state "ADVISED" from "READY"' - expectedNumberOfInvocationsToSaveCmHandleState * mockLcmEventsCmHandleStateHandler.updateCmHandleState(yangModelCmHandle, CmHandleState.ADVISED) - where: - scenario | isReadyForRetry || expectedNumberOfInvocationsToSaveCmHandleState - 'retry locked cm handle once' | [true, false] || 1 - 'retry locked cm handle twice' | [true, true] || 2 - 'do not retry locked cm handle' | [false, false] || 0 + def createDataNodes(numberOfDataNodes) { + def dataNodes = [] + (1..numberOfDataNodes).each {dataNodes.add(new DataNode())} + return dataNodes } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy index 52fb110b33..6ccdcf12d3 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy @@ -47,8 +47,6 @@ import java.util.stream.Collectors class SyncUtilsSpec extends Specification{ - def mockInventoryPersistence = Mock(InventoryPersistence) - def mockCmHandleQueries = Mock(CmHandleQueries) def mockDmiDataOperations = Mock(DmiDataOperations) @@ -63,28 +61,14 @@ class SyncUtilsSpec extends Specification{ @Shared def dataNode = new DataNode(leaves: ['id': 'cm-handle-123']) - @Shared - def dataNodeAdditionalProperties = new DataNode(leaves: ['name': 'dmiProp1', 'value': 'dmiValue1']) - def 'Get an advised Cm-Handle where ADVISED cm handle #scenario'() { given: 'the inventory persistence service returns a collection of data nodes' mockCmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED) >> dataNodeCollection - and: 'we have some additional (dmi, private) properties' - dataNodeAdditionalProperties.xpath = dataNode.xpath + '/additional-properties[@name="dmiProp1"]' - dataNode.childDataNodes = [dataNodeAdditionalProperties] when: 'get advised cm handles are fetched' def yangModelCmHandles = objectUnderTest.getAdvisedCmHandles() then: 'the returned data node collection is the correct size' yangModelCmHandles.size() == expectedDataNodeSize - and: 'if there is a data node the additional (dmi, private) properties are included' - if (expectedDataNodeSize > 0) { - assert yangModelCmHandles[0].dmiProperties[0].name == 'dmiProp1' - assert yangModelCmHandles[0].dmiProperties[0].value == 'dmiValue1' - } - and: 'yang model collection contains the correct data' - yangModelCmHandles.stream().map(yangModel -> yangModel.id).collect(Collectors.toSet()) == - dataNodeCollection.stream().map(dataNode -> dataNode.leaves.get("id")).collect(Collectors.toSet()) where: 'the following scenarios are used' scenario | dataNodeCollection || expectedCallsToGetYangModelCmHandle | expectedDataNodeSize 'exists' | [dataNode] || 1 | 1 diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/YangDataConverterSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/YangDataConverterSpec.groovy new file mode 100644 index 0000000000..20d384fa53 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/YangDataConverterSpec.groovy @@ -0,0 +1,42 @@ +/* + * ============LICENSE_START======================================================== + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.utils + +import org.onap.cps.ncmp.api.impl.utils.YangDataConverter +import org.onap.cps.spi.model.DataNode +import spock.lang.Specification + +class YangDataConverterSpec extends Specification{ + + def 'Convert a cm handle data node with private properties.'() { + given: 'a datanode with some additional (dmi, private) properties' + def dataNodeAdditionalProperties = new DataNode(xpath:'/additional-properties[@name="dmiProp1"]', + leaves: ['name': 'dmiProp1', 'value': 'dmiValue1']) + def dataNode = new DataNode(childDataNodes:[dataNodeAdditionalProperties]) + when: 'the dataNode is converted' + def yangModelCmHandle = YangDataConverter.convertCmHandleToYangModel(dataNode,'sample-id') + then: 'the converted object has the correct id' + assert yangModelCmHandle.id == 'sample-id' + and: 'the additional (dmi, private) properties are included' + assert yangModelCmHandle.dmiProperties[0].name == 'dmiProp1' + assert yangModelCmHandle.dmiProperties[0].value == 'dmiValue1' + } +} diff --git a/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java b/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java index c4a2c2fe98..61e1d5b569 100644 --- a/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java +++ b/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java @@ -101,6 +101,16 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, newListElements); } + @Override + @Transactional + public void addListElementsBatch(final String dataspaceName, final String anchorName, final String parentNodeXpath, + final Collection<Collection<DataNode>> newListsElements) { + + newListsElements.forEach( + newListElement -> addListElements(dataspaceName, anchorName, parentNodeXpath, newListElement)); + + } + private void addChildDataNodes(final String dataspaceName, final String anchorName, final String parentNodeXpath, final Collection<DataNode> newChildren) { final FragmentEntity parentFragmentEntity = getFragmentByXpath(dataspaceName, anchorName, parentNodeXpath); diff --git a/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceIntegrationSpec.groovy b/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceIntegrationSpec.groovy index fee489d18b..acc243b5b4 100755 --- a/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceIntegrationSpec.groovy +++ b/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceIntegrationSpec.groovy @@ -157,7 +157,7 @@ class CpsDataPersistenceServiceIntegrationSpec extends CpsPersistenceSpecBase { } @Sql([CLEAR_DATA, SET_DATA]) - def 'Add multiple new list elements including an element with a child datanode.'() { + def 'Add collection of multiple new list elements including an element with a child datanode.'() { given: 'two new child list elements for an existing parent' def listElementXpaths = ['/parent-201/child-204[@key="NEW1"]', '/parent-201/child-204[@key="NEW2"]'] def listElements = toDataNodes(listElementXpaths) @@ -165,7 +165,7 @@ class CpsDataPersistenceServiceIntegrationSpec extends CpsPersistenceSpecBase { def grandChild = buildDataNode('/parent-201/child-204[@key="NEW1"]/grand-child-204[@key2="NEW1-CHILD"]', [leave:'value'], []) listElements[0].childDataNodes = [grandChild] when: 'the new data node (list elements) are added to an existing parent node' - objectUnderTest.addListElements(DATASPACE_NAME, ANCHOR_NAME3, '/parent-201', listElements) + objectUnderTest.addListElementsBatch(DATASPACE_NAME, ANCHOR_NAME3, '/parent-201', [listElements]) then: 'new entries are successfully persisted, parent node now contains 5 children (2 new + 3 existing before)' def parentFragment = fragmentRepository.getById(LIST_DATA_NODE_PARENT201_FRAGMENT_ID) def allChildXpaths = parentFragment.childFragments.collect { it.xpath } diff --git a/cps-service/src/main/java/org/onap/cps/api/CpsDataService.java b/cps-service/src/main/java/org/onap/cps/api/CpsDataService.java index decf67d24e..b2e8c5ba42 100644 --- a/cps-service/src/main/java/org/onap/cps/api/CpsDataService.java +++ b/cps-service/src/main/java/org/onap/cps/api/CpsDataService.java @@ -69,6 +69,19 @@ public interface CpsDataService { OffsetDateTime observedTimestamp); /** + * Persists child data fragment representing one or more list elements under existing data node for the + * given anchor and dataspace. + * + * @param dataspaceName dataspace name + * @param anchorName anchor name + * @param parentNodeXpath parent node xpath + * @param jsonDataList collection of json data representing list element(s) + * @param observedTimestamp observedTimestamp + */ + void saveListElementsBatch(String dataspaceName, String anchorName, String parentNodeXpath, + Collection<String> jsonDataList, OffsetDateTime observedTimestamp); + + /** * Retrieves datanode by XPath for given dataspace and anchor. * * @param dataspaceName dataspace name diff --git a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java index 092fd31fcf..6bf493556e 100755 --- a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java +++ b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java @@ -92,6 +92,17 @@ public class CpsDataServiceImpl implements CpsDataService { } @Override + public void saveListElementsBatch(final String dataspaceName, final String anchorName, final String parentNodeXpath, + final Collection<String> jsonDataList, final OffsetDateTime observedTimestamp) { + CpsValidator.validateNameCharacters(dataspaceName, anchorName); + final Collection<Collection<DataNode>> listElementDataNodeCollections = + buildDataNodes(dataspaceName, anchorName, parentNodeXpath, jsonDataList); + cpsDataPersistenceService.addListElementsBatch(dataspaceName, anchorName, parentNodeXpath, + listElementDataNodeCollections); + processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp); + } + + @Override public DataNode getDataNode(final String dataspaceName, final String anchorName, final String xpath, final FetchDescendantsOption fetchDescendantsOption) { CpsValidator.validateNameCharacters(dataspaceName, anchorName); @@ -252,6 +263,13 @@ public class CpsDataServiceImpl implements CpsDataService { } + private Collection<Collection<DataNode>> buildDataNodes(final String dataspaceName, final String anchorName, + final String parentNodeXpath, final Collection<String> jsonDataList) { + return jsonDataList.stream() + .map(jsonData -> buildDataNodes(dataspaceName, anchorName, parentNodeXpath, jsonData)) + .collect(Collectors.toList()); + } + private void processDataUpdatedEventAsync(final String dataspaceName, final String anchorName, final String xpath, final Operation operation, final OffsetDateTime observedTimestamp) { try { diff --git a/cps-service/src/main/java/org/onap/cps/spi/CpsDataPersistenceService.java b/cps-service/src/main/java/org/onap/cps/spi/CpsDataPersistenceService.java index 686f0f3fee..8b45ae78d9 100644 --- a/cps-service/src/main/java/org/onap/cps/spi/CpsDataPersistenceService.java +++ b/cps-service/src/main/java/org/onap/cps/spi/CpsDataPersistenceService.java @@ -66,6 +66,17 @@ public interface CpsDataPersistenceService { Collection<DataNode> listElementsCollection); /** + * Adds list child elements to a Fragment. + * + * @param dataspaceName dataspace name + * @param anchorName anchor name + * @param parentNodeXpath parent node xpath + * @param listElementsCollections collections of data nodes representing list elements + */ + void addListElementsBatch(String dataspaceName, String anchorName, String parentNodeXpath, + Collection<Collection<DataNode>> listElementsCollections); + + /** * Retrieves datanode by XPath for given dataspace and anchor. * * @param dataspaceName dataspace name diff --git a/cps-service/src/main/java/org/onap/cps/spi/model/DataNode.java b/cps-service/src/main/java/org/onap/cps/spi/model/DataNode.java index d80306bae8..c77daafb38 100644 --- a/cps-service/src/main/java/org/onap/cps/spi/model/DataNode.java +++ b/cps-service/src/main/java/org/onap/cps/spi/model/DataNode.java @@ -22,6 +22,7 @@ package org.onap.cps.spi.model; +import java.io.Serializable; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -33,7 +34,9 @@ import lombok.Setter; @Setter(AccessLevel.PROTECTED) @Getter @EqualsAndHashCode -public class DataNode { +public class DataNode implements Serializable { + + private static final long serialVersionUID = 1482619410918597467L; DataNode() { } diff --git a/cps-service/src/test/groovy/org/onap/cps/api/impl/CpsDataServiceImplSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/api/impl/CpsDataServiceImplSpec.groovy index cb352bccec..ab960df6aa 100644 --- a/cps-service/src/test/groovy/org/onap/cps/api/impl/CpsDataServiceImplSpec.groovy +++ b/cps-service/src/test/groovy/org/onap/cps/api/impl/CpsDataServiceImplSpec.groovy @@ -37,6 +37,7 @@ import org.onap.cps.yang.YangTextSchemaSourceSetBuilder import spock.lang.Specification import java.time.OffsetDateTime +import java.util.stream.Collectors class CpsDataServiceImplSpec extends Specification { def mockCpsDataPersistenceService = Mock(CpsDataPersistenceService) @@ -135,6 +136,26 @@ class CpsDataServiceImplSpec extends Specification { 1 * mockNotificationService.processDataUpdatedEvent(dataspaceName, anchorName, '/test-tree', Operation.UPDATE, observedTimestamp) } + def 'Saving collection of a batch with data fragment under existing node.'() { + given: 'schema set for given anchor and dataspace references test-tree model' + setupSchemaSetMocks('test-tree.yang') + when: 'save data method is invoked with list element json data' + def jsonData = '{"branch": [{"name": "A"}, {"name": "B"}]}' + objectUnderTest.saveListElementsBatch(dataspaceName, anchorName, '/test-tree', [jsonData], observedTimestamp) + then: 'the persistence service method is invoked with correct parameters' + 1 * mockCpsDataPersistenceService.addListElementsBatch(dataspaceName, anchorName, '/test-tree',_) >> { + args -> { + def listElementsCollection = args[3] as Collection<Collection<DataNode>> + assert listElementsCollection.size() == 1 + def listOfXpaths = listElementsCollection.stream().flatMap(x -> x.stream()).map(it-> it.xpath).collect(Collectors.toList()) + assert listOfXpaths.size() == 2 + assert listOfXpaths.containsAll(['/test-tree/branch[@name=\'B\']','/test-tree/branch[@name=\'A\']']) + } + } + and: 'data updated event is sent to notification service' + 1 * mockNotificationService.processDataUpdatedEvent(dataspaceName, anchorName, '/test-tree', Operation.UPDATE, observedTimestamp) + } + def 'Saving empty list element data fragment.'() { given: 'schema set for given anchor and dataspace references test-tree model' setupSchemaSetMocks('test-tree.yang') |