From 6b2cdc63c02cc2d1a9bc9d43997ab14e740d7973 Mon Sep 17 00:00:00 2001 From: ToineSiebelink Date: Mon, 29 Aug 2022 12:35:33 +0100 Subject: Performance Improvement: Use hazelcast blocking queue - Introducing hazelcast for queue and progress map - process batch of 100 at the time - decreased module sync watchdog sleeptime to 5 seconds - separate module sync tasks in new class and some other async preparations and easier testing - tests for batching in module sync watchdog - remove qualifiers annotation (support) where no longer needed Issue-ID: CPS-1210 Issue-ID: CPS-1126 Signed-off-by: ToineSiebelink Change-Id: I0a7d3755bf774e27c5688741bddb01f427d4a8a7 --- cps-application/src/main/resources/application.yml | 2 +- cps-ncmp-service/lombok.config | 1 - .../embeddedcache/SynchronizationCacheConfig.java | 113 +++++++++++++++++++ .../SynchronizationSemaphoresConfig.java | 78 -------------- .../ncmp/api/inventory/sync/DataSyncWatchdog.java | 6 +- .../ncmp/api/inventory/sync/ModuleSyncService.java | 14 ++- .../ncmp/api/inventory/sync/ModuleSyncTasks.java | 113 +++++++++++++++++++ .../api/inventory/sync/ModuleSyncWatchdog.java | 119 ++++++++++----------- .../cps/ncmp/api/inventory/sync/SyncUtils.java | 17 ++- .../SynchronizationCacheConfigSpec.groovy | 54 ++++++++++ .../SynchronizationSemaphoresConfigSpec.groovy | 49 --------- .../inventory/sync/ModuleSyncServiceSpec.groovy | 34 ++---- .../api/inventory/sync/ModuleSyncTasksSpec.groovy | 110 +++++++++++++++++++ .../inventory/sync/ModuleSyncWatchdogSpec.groovy | 112 ++++++------------- .../ncmp/api/inventory/sync/SyncUtilsSpec.groovy | 16 --- .../ncmp/api/utils/YangDataConverterSpec.groovy | 42 ++++++++ .../main/java/org/onap/cps/spi/model/DataNode.java | 5 +- 17 files changed, 555 insertions(+), 330 deletions(-) create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java create mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy delete mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfigSpec.groovy create mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy create mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/YangDataConverterSpec.groovy diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml index 17551344fe..f7fa7356a3 100644 --- a/cps-application/src/main/resources/application.yml +++ b/cps-application/src/main/resources/application.yml @@ -167,7 +167,7 @@ dmi: timers: advised-modules-sync: - sleep-time-ms: 30000 + sleep-time-ms: 5000 locked-modules-sync: sleep-time-ms: 300000 cm-handle-data-sync: diff --git a/cps-ncmp-service/lombok.config b/cps-ncmp-service/lombok.config index b60a192069..1fba85bb7b 100644 --- a/cps-ncmp-service/lombok.config +++ b/cps-ncmp-service/lombok.config @@ -18,4 +18,3 @@ config.stopBubbling = true lombok.addLombokGeneratedAnnotation = true -lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Qualifier \ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java new file mode 100644 index 0000000000..abde4c2d54 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java @@ -0,0 +1,113 @@ +/* + * ============LICENSE_START======================================================== + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.config.embeddedcache; + +import com.hazelcast.config.Config; +import com.hazelcast.config.MapConfig; +import com.hazelcast.config.NamedConfig; +import com.hazelcast.config.QueueConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import org.onap.cps.spi.model.DataNode; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * Core infrastructure of the hazelcast distributed caches for Module Sync and Data Sync use cases. + */ +@Configuration +public class SynchronizationCacheConfig { + + private static final QueueConfig commonQueueConfig = createQueueConfig(); + private static final MapConfig moduleSyncStartedConfig = + createMapConfig("moduleSyncStartedConfig", TimeUnit.MINUTES.toSeconds(1)); + private static final MapConfig dataSyncSemaphoresConfig = + createMapConfig("dataSyncSemaphoresConfig", TimeUnit.MINUTES.toSeconds(30)); + + /** + * Module Sync Distributed Queue Instance. + * + * @return queue of cm handles (data nodes) that need module sync + */ + @Bean + public BlockingQueue moduleSyncWorkQueue() { + return createHazelcastInstance("moduleSyncWorkQueue", commonQueueConfig) + .getQueue("moduleSyncWorkQueue"); + } + + /** + * Module Sync started (and maybe finished) on cm handles (ids). + * + * @return Map of cm handles (ids) and objects (not used really) for which module sync has started or been completed + */ + @Bean + public Map moduleSyncStartedOnCmHandles() { + return createHazelcastInstance("moduleSyncStartedOnCmHandles", moduleSyncStartedConfig) + .getMap("moduleSyncStartedOnCmHandles"); + } + + /** + * Data Sync Distributed Map Instance. + * + * @return configured map of data sync semaphores + */ + @Bean + public Map dataSyncSemaphores() { + return createHazelcastInstance("dataSyncSemaphores", dataSyncSemaphoresConfig) + .getMap("dataSyncSemaphores"); + } + + private HazelcastInstance createHazelcastInstance( + final String hazelcastInstanceName, final NamedConfig namedConfig) { + return Hazelcast.newHazelcastInstance(initializeConfig(hazelcastInstanceName, namedConfig)); + } + + private Config initializeConfig(final String instanceName, final NamedConfig namedConfig) { + final Config config = new Config(instanceName); + if (namedConfig instanceof MapConfig) { + config.addMapConfig((MapConfig) namedConfig); + } + if (namedConfig instanceof QueueConfig) { + config.addQueueConfig((QueueConfig) namedConfig); + } + config.setClusterName("synchronization-caches"); + return config; + } + + private static QueueConfig createQueueConfig() { + final QueueConfig commonQueueConfig = new QueueConfig("defaultQueueConfig"); + commonQueueConfig.setBackupCount(3); + commonQueueConfig.setAsyncBackupCount(3); + return commonQueueConfig; + } + + private static MapConfig createMapConfig(final String configName, final long timeToLiveSeconds) { + final MapConfig mapConfig = new MapConfig(configName); + mapConfig.setBackupCount(3); + mapConfig.setAsyncBackupCount(3); + mapConfig.setTimeToLiveSeconds((int) timeToLiveSeconds); + return mapConfig; + } + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java deleted file mode 100644 index 571558ac0c..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * ===========LICENSE_START======================================================== - * Copyright (C) 2022 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.config.embeddedcache; - -import com.hazelcast.config.Config; -import com.hazelcast.config.MapConfig; -import com.hazelcast.core.Hazelcast; -import com.hazelcast.core.HazelcastInstance; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -/** - * Core infrastructure of the hazelcast distributed map for Module Sync and Data Sync use cases. - */ -@Configuration -public class SynchronizationSemaphoresConfig { - - private static final int TIME_TO_LIVE_IN_SECONDS = (int) TimeUnit.MINUTES.toSeconds(30); - - /** - * Module Sync Distributed Map Instance. - * - * @return configured map of module sync semaphores - */ - @Bean - public ConcurrentMap moduleSyncSemaphores() { - return createHazelcastInstance("moduleSyncSemaphores", "moduleSyncSemaphoresConfig") - .getMap("moduleSyncSemaphores"); - } - - /** - * Data Sync Distributed Map Instance. - * - * @return configured map of data sync semaphores - */ - @Bean - public ConcurrentMap dataSyncSemaphores() { - return createHazelcastInstance("dataSyncSemaphores", "dataSyncSemaphoresConfig") - .getMap("dataSyncSemaphores"); - } - - private HazelcastInstance createHazelcastInstance( - final String hazelcastInstanceName, final String configMapName) { - return Hazelcast.newHazelcastInstance( - initializeDefaultMapConfig(hazelcastInstanceName, configMapName)); - } - - private Config initializeDefaultMapConfig(final String instanceName, final String configName) { - final Config config = new Config(instanceName); - final MapConfig mapConfig = new MapConfig(configName); - mapConfig.setTimeToLiveSeconds(TIME_TO_LIVE_IN_SECONDS); - mapConfig.setBackupCount(3); - mapConfig.setAsyncBackupCount(3); - config.addMapConfig(mapConfig); - config.setClusterName("synchronization-semaphores"); - return config; - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java index 45ba078044..107f8a04bb 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java @@ -21,7 +21,7 @@ package org.onap.cps.ncmp.api.inventory.sync; import java.time.OffsetDateTime; -import java.util.concurrent.ConcurrentMap; +import java.util.Map; import java.util.function.Consumer; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -29,7 +29,6 @@ import org.onap.cps.api.CpsDataService; import org.onap.cps.ncmp.api.inventory.CompositeState; import org.onap.cps.ncmp.api.inventory.DataStoreSyncState; import org.onap.cps.ncmp.api.inventory.InventoryPersistence; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @@ -47,8 +46,7 @@ public class DataSyncWatchdog { private final SyncUtils syncUtils; - @Qualifier("dataSyncSemaphores") - private final ConcurrentMap dataSyncSemaphores; + private final Map dataSyncSemaphores; /** * Execute Cm Handle poll which queries the cm handle state in 'READY' and Operational Datastore Sync State in diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java index c574aa61d9..7f61c476d5 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java @@ -86,19 +86,17 @@ public class ModuleSyncService { } /** - * Deletes the SchemaSet for provided cmHandle if the SchemaSet Exists. + * Deletes the SchemaSet for schema set id if the SchemaSet Exists. * - * @param yangModelCmHandle the yang model of cm handle. + * @param schemaSetId the schema set id to be deleted */ - public void deleteSchemaSetIfExists(final YangModelCmHandle yangModelCmHandle) { - final String schemaSetAndAnchorName = yangModelCmHandle.getId(); + public void deleteSchemaSetIfExists(final String schemaSetId) { try { - cpsModuleService.deleteSchemaSet(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, schemaSetAndAnchorName, + cpsModuleService.deleteSchemaSet(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, schemaSetId, CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED); - log.debug("SchemaSet for {} has been deleted. Ready to be recreated.", schemaSetAndAnchorName); + log.debug("SchemaSet for {} has been deleted. Ready to be recreated.", schemaSetId); } catch (final SchemaSetNotFoundException e) { - log.debug("No SchemaSet for {}. Assuming CmHandle has not been previously Module Synced.", - schemaSetAndAnchorName); + log.debug("No SchemaSet for {}. Assuming CmHandle has not been previously Module Synced.", schemaSetId); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java new file mode 100644 index 0000000000..5e26650eda --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java @@ -0,0 +1,113 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.inventory.sync; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler; +import org.onap.cps.ncmp.api.impl.utils.YangDataConverter; +import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; +import org.onap.cps.ncmp.api.inventory.CmHandleState; +import org.onap.cps.ncmp.api.inventory.CompositeState; +import org.onap.cps.ncmp.api.inventory.InventoryPersistence; +import org.onap.cps.ncmp.api.inventory.LockReasonCategory; +import org.onap.cps.spi.model.DataNode; +import org.springframework.stereotype.Component; + +@RequiredArgsConstructor +@Component +@Slf4j +public class ModuleSyncTasks { + private final InventoryPersistence inventoryPersistence; + private final SyncUtils syncUtils; + private final ModuleSyncService moduleSyncService; + private final LcmEventsCmHandleStateHandler lcmEventsCmHandleStateHandler; + + private static final CompletableFuture COMPLETED_FUTURE = CompletableFuture.completedFuture(null); + + /** + * Perform module sync on a batch of cm handles. + * + * @param cmHandlesAsDataNodes a batch of Data nodes representing cm handles to perform module sync on + * @return completed future to handle post-processing + */ + public CompletableFuture performModuleSync(final Collection cmHandlesAsDataNodes) { + final Map cmHandelStatePerCmHandle = new HashMap<>(); + for (final DataNode cmHandleAsDataNode : cmHandlesAsDataNodes) { + final String cmHandleId = String.valueOf(cmHandleAsDataNode.getLeaves().get("id")); + final YangModelCmHandle yangModelCmHandle = + YangDataConverter.convertCmHandleToYangModel(cmHandleAsDataNode, cmHandleId); + final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId); + try { + moduleSyncService.deleteSchemaSetIfExists(cmHandleId); + moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle); + cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY); + } catch (final Exception e) { + syncUtils.updateLockReasonDetailsAndAttempts(compositeState, + LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage()); + setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason()); + cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.LOCKED); + } + log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name()); + } + updateCmHandlesStateBatch(cmHandelStatePerCmHandle); + return COMPLETED_FUTURE; + } + + /** + * Reset state to "ADVISED" for any previously failed cm handles. + * + * @param failedCmHandles previously failed (locked) cm handles + * @return completed future to handle post-processing + */ + public CompletableFuture resetFailedCmHandles(final List failedCmHandles) { + final Map cmHandleStatePerCmHandle = new HashMap<>(failedCmHandles.size()); + for (final YangModelCmHandle failedCmHandle : failedCmHandles) { + final CompositeState compositeState = failedCmHandle.getCompositeState(); + final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState); + if (isReadyForRetry) { + log.debug("Reset cm handle {} state to ADVISED to be re-attempted by module-sync watchdog", + failedCmHandle.getId()); + cmHandleStatePerCmHandle.put(failedCmHandle, CmHandleState.ADVISED); + } + } + updateCmHandlesStateBatch(cmHandleStatePerCmHandle); + return COMPLETED_FUTURE; + } + + private void setCmHandleStateLocked(final YangModelCmHandle advisedCmHandle, + final CompositeState.LockReason lockReason) { + advisedCmHandle.getCompositeState().setLockReason(lockReason); + } + + private void updateCmHandlesStateBatch(final Map cmHandleStatePerCmHandle) { + // To be refactored as part of CPS-1231; Use state-save-batch capability (depends sub-task12, 13) + for (final Map.Entry entry : cmHandleStatePerCmHandle.entrySet()) { + lcmEventsCmHandleStateHandler.updateCmHandleState(entry.getKey(), entry.getValue()); + } + } + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java index be811a1147..8074fe6fe1 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java @@ -21,17 +21,16 @@ package org.onap.cps.ncmp.api.inventory.sync; +import java.util.Collection; +import java.util.HashSet; import java.util.List; -import java.util.concurrent.ConcurrentMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; -import org.onap.cps.ncmp.api.inventory.CmHandleState; -import org.onap.cps.ncmp.api.inventory.CompositeState; -import org.onap.cps.ncmp.api.inventory.InventoryPersistence; -import org.onap.cps.ncmp.api.inventory.LockReasonCategory; -import org.springframework.beans.factory.annotation.Qualifier; +import org.onap.cps.spi.model.DataNode; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -40,75 +39,75 @@ import org.springframework.stereotype.Component; @Component public class ModuleSyncWatchdog { - private static final boolean MODEL_SYNC_IN_PROGRESS = false; - private static final boolean MODEL_SYNC_DONE = true; - - private final InventoryPersistence inventoryPersistence; - private final SyncUtils syncUtils; + private final BlockingQueue moduleSyncWorkQueue; + private final Map moduleSyncStartedOnCmHandles; + private final ModuleSyncTasks moduleSyncTasks; - private final ModuleSyncService moduleSyncService; - - @Qualifier("moduleSyncSemaphores") - private final ConcurrentMap moduleSyncSemaphores; - - private final LcmEventsCmHandleStateHandler lcmEventsCmHandleStateHandler; + private static final int MODULE_SYNC_BATCH_SIZE = 100; + private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10; + private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started"; /** * Execute Cm Handle poll which changes the cm handle state from 'ADVISED' to 'READY'. */ - @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:30000}") - public void executeAdvisedCmHandlePoll() { - syncUtils.getAdvisedCmHandles().forEach(advisedCmHandle -> { - final String cmHandleId = advisedCmHandle.getId(); - if (hasPushedIntoSemaphoreMap(cmHandleId)) { - log.debug("executing module sync on {}", cmHandleId); - final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId); - try { - moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle); - moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle); - lcmEventsCmHandleStateHandler.updateCmHandleState(advisedCmHandle, CmHandleState.READY); - updateModuleSyncSemaphoreMap(cmHandleId); - } catch (final Exception e) { - syncUtils.updateLockReasonDetailsAndAttempts(compositeState, - LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage()); - setCmHandleStateLocked(advisedCmHandle, compositeState.getLockReason()); - } - log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name()); - } else { - log.debug("{} already processed by another instance", cmHandleId); - } - }); - log.debug("No Cm-Handles currently found in an ADVISED state"); + @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:5000}") + public void moduleSyncAdvisedCmHandles() { + populateWorkQueueIfNeeded(); + while (!moduleSyncWorkQueue.isEmpty()) { + final Collection nextBatch = prepareNextBatch(); + moduleSyncTasks.performModuleSync(nextBatch); + preventBusyWait(); + } } /** - * Execute Cm Handle poll which changes the cm handle state from 'LOCKED' to 'ADVISED'. + * Find any failed (locked) cm handles and change state back to 'ADVISED'. */ @Scheduled(fixedDelayString = "${timers.locked-modules-sync.sleep-time-ms:300000}") - public void executeLockedCmHandlePoll() { - final List lockedCmHandles = syncUtils.getModuleSyncFailedCmHandles(); - for (final YangModelCmHandle lockedCmHandle : lockedCmHandles) { - final CompositeState compositeState = lockedCmHandle.getCompositeState(); - final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState); - if (isReadyForRetry) { - log.debug("Reset cm handle {} state to ADVISED to re-attempt module-sync", lockedCmHandle.getId()); - lcmEventsCmHandleStateHandler.updateCmHandleState(lockedCmHandle, CmHandleState.ADVISED); - } - } + public void resetPreviouslyFailedCmHandles() { + final List failedCmHandles = syncUtils.getModuleSyncFailedCmHandles(); + moduleSyncTasks.resetFailedCmHandles(failedCmHandles); } - private void setCmHandleStateLocked(final YangModelCmHandle advisedCmHandle, - final CompositeState.LockReason lockReason) { - advisedCmHandle.getCompositeState().setLockReason(lockReason); - lcmEventsCmHandleStateHandler.updateCmHandleState(advisedCmHandle, CmHandleState.LOCKED); + private void preventBusyWait() { + // This method isn't really needed until CPS-1200 Performance Improvement: Watchdog Parallel execution + // but leaving here to minimize impacts on this class for that Jira + try { + TimeUnit.MILLISECONDS.sleep(PREVENT_CPU_BURN_WAIT_TIME_MILLIS); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } } - private void updateModuleSyncSemaphoreMap(final String cmHandleId) { - moduleSyncSemaphores.replace(cmHandleId, MODEL_SYNC_DONE); + private void populateWorkQueueIfNeeded() { + if (moduleSyncWorkQueue.isEmpty()) { + final List advisedCmHandles = syncUtils.getAdvisedCmHandles(); + for (final DataNode advisedCmHandle : advisedCmHandles) { + if (!moduleSyncWorkQueue.offer(advisedCmHandle)) { + log.warn("Unable to add cm handle {} to the work queue", advisedCmHandle.getLeaves().get("id")); + } + } + } } - private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) { - return moduleSyncSemaphores.putIfAbsent(cmHandleId, MODEL_SYNC_IN_PROGRESS) == null; + private Collection prepareNextBatch() { + final Collection nextBatchCandidates = new HashSet<>(MODULE_SYNC_BATCH_SIZE); + final Collection nextBatch = new HashSet<>(MODULE_SYNC_BATCH_SIZE); + moduleSyncWorkQueue.drainTo(nextBatchCandidates, MODULE_SYNC_BATCH_SIZE); + log.debug("nextBatchCandidates size : {}", nextBatchCandidates.size()); + for (final DataNode batchCandidate : nextBatchCandidates) { + final String cmHandleId = String.valueOf(batchCandidate.getLeaves().get("id")); + final boolean alreadyAddedToInProgressMap = VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP + .equals(moduleSyncStartedOnCmHandles.putIfAbsent(cmHandleId, VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP)); + if (alreadyAddedToInProgressMap) { + log.debug("module sync for {} already in progress by other instance", cmHandleId); + } else { + nextBatch.add(batchCandidate); + } + } + log.debug("nextBatch size : {}", nextBatch.size()); + return nextBatch; } + } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java index 16fb8f44d8..537f50122c 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java @@ -64,19 +64,14 @@ public class SyncUtils { private static final Pattern retryAttemptPattern = Pattern.compile("^Attempt #(\\d+) failed:"); /** - * Query data nodes for cm handles with an "ADVISED" cm handle state, and select a random entry for processing. + * Query data nodes for cm handles with an "ADVISED" cm handle state. * - * @return a randomized yang model cm handle list with ADVISED state, return empty list if not found + * @return cm handles (data nodes) in ADVISED state (empty list if none found) */ - public List getAdvisedCmHandles() { - final List advisedCmHandlesAsDataNodeList = new ArrayList<>( - cmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED)); - log.debug("Total number of fetched advised cm handle(s) is (are) {}", advisedCmHandlesAsDataNodeList.size()); - if (advisedCmHandlesAsDataNodeList.isEmpty()) { - return Collections.emptyList(); - } - Collections.shuffle(advisedCmHandlesAsDataNodeList); - return convertCmHandlesDataNodesToYangModelCmHandles(advisedCmHandlesAsDataNodeList); + public List getAdvisedCmHandles() { + final List advisedCmHandlesAsDataNodes = cmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED); + log.debug("Total number of fetched advised cm handle(s) is (are) {}", advisedCmHandlesAsDataNodes.size()); + return advisedCmHandlesAsDataNodes; } /** diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy new file mode 100644 index 0000000000..80aa81b047 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START======================================================== + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.cps.ncmp.api.impl.config.embeddedcache +import com.hazelcast.core.Hazelcast +import org.onap.cps.spi.model.DataNode +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.ContextConfiguration +import spock.lang.Specification +import java.util.concurrent.BlockingQueue + +@SpringBootTest +@ContextConfiguration(classes = [SynchronizationCacheConfig]) +class SynchronizationCacheConfigSpec extends Specification { + + @Autowired + private BlockingQueue moduleSyncWorkQueue + + @Autowired + private Map moduleSyncStartedOnCmHandles + + @Autowired + private Map dataSyncSemaphores + + def 'Embedded (hazelcast) Caches for Module and Data Sync.'() { + expect: 'system is able to create an instance of the Module Sync Work Queue' + assert null != moduleSyncWorkQueue + and: 'system is able to create an instance of a map to hold cm handles which have started (and maybe finished) module sync' + assert null != moduleSyncStartedOnCmHandles + and: 'system is able to create an instance of a map to hold data sync semaphores' + assert null != dataSyncSemaphores + and: 'there 3 instances' + assert Hazelcast.allHazelcastInstances.size() == 3 + and: 'they have the correct names (in any order)' + assert Hazelcast.allHazelcastInstances.name.containsAll('moduleSyncWorkQueue', 'moduleSyncStartedOnCmHandles', 'dataSyncSemaphores' ) + } +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfigSpec.groovy deleted file mode 100644 index ea84b440f3..0000000000 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfigSpec.groovy +++ /dev/null @@ -1,49 +0,0 @@ -/* - * ============LICENSE_START======================================================== - * Copyright (C) 2022 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.config.embeddedcache - -import com.hazelcast.core.Hazelcast -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.boot.test.context.SpringBootTest -import org.springframework.test.context.ContextConfiguration -import spock.lang.Specification - -@SpringBootTest -@ContextConfiguration(classes = [SynchronizationSemaphoresConfig]) -class SynchronizationSemaphoresConfigSpec extends Specification { - - @Autowired - private Map moduleSyncSemaphores; - - @Autowired - private Map dataSyncSemaphores; - - def 'Embedded Sync Semaphores'() { - expect: 'system is able to create an instance of ModuleSyncSemaphores' - assert null != moduleSyncSemaphores - and: 'system is able to create an instance of DataSyncSemaphores' - assert null != dataSyncSemaphores - and: 'we have 2 instances' - assert Hazelcast.allHazelcastInstances.size() == 2 - and: 'the names match' - assert Hazelcast.allHazelcastInstances.name == ['moduleSyncSemaphores', 'dataSyncSemaphores'] - } -} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncServiceSpec.groovy index 6a2fbe8e7b..78da7eb747 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncServiceSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncServiceSpec.groovy @@ -24,8 +24,6 @@ import org.onap.cps.api.CpsAdminService import org.onap.cps.api.CpsModuleService import org.onap.cps.ncmp.api.impl.operations.DmiModelOperations import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle -import org.onap.cps.ncmp.api.inventory.CmHandleState -import org.onap.cps.ncmp.api.inventory.CompositeState import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle import org.onap.cps.spi.CascadeDeleteAllowed import org.onap.cps.spi.exceptions.SchemaSetNotFoundException @@ -34,7 +32,6 @@ import spock.lang.Specification class ModuleSyncServiceSpec extends Specification { - def mockCpsModuleService = Mock(CpsModuleService) def mockDmiModelOperations = Mock(DmiModelOperations) def mockCpsAdminService = Mock(CpsAdminService) @@ -72,38 +69,27 @@ class ModuleSyncServiceSpec extends Specification { } def 'Delete Schema Set for CmHandle' () { - given: 'a CmHandle in the advised state' - def cmHandle = new YangModelCmHandle(id: 'some-cmhandle-id', compositeState: new CompositeState(cmHandleState: CmHandleState.ADVISED)) - and: 'the Schema Set exists for the CmHandle' - 1 * mockCpsModuleService.deleteSchemaSet(_ as String, 'some-cmhandle-id', - CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED) when: 'delete schema set if exists is called' - objectUnderTest.deleteSchemaSetIfExists(cmHandle) - then: 'there are no exceptions' - noExceptionThrown() + objectUnderTest.deleteSchemaSetIfExists('some-cmhandle-id') + then: 'the module service is invoked to delete the correct schema set' + 1 * mockCpsModuleService.deleteSchemaSet(expectedDataspaceName, 'some-cmhandle-id', CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED) } def 'Delete a non-existing Schema Set for CmHandle' () { - given: 'a CmHandle in the advised state' - def cmHandle = new YangModelCmHandle(id: 'some-cmhandle-id', compositeState: new CompositeState(cmHandleState: CmHandleState.ADVISED)) - and: 'the DB throws an exception because its Schema Set does not exist' - 1 * mockCpsModuleService.deleteSchemaSet(_ as String, 'some-cmhandle-id', - CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED) >> { throw new SchemaSetNotFoundException('some-dataspace-name', 'some-cmhandle-id') } + given: 'the DB throws an exception because its Schema Set does not exist' + mockCpsModuleService.deleteSchemaSet(*_) >> { throw new SchemaSetNotFoundException('some-dataspace-name', 'some-cmhandle-id') } when: 'delete schema set if exists is called' - objectUnderTest.deleteSchemaSetIfExists(cmHandle) - then: 'there are no exceptions' + objectUnderTest.deleteSchemaSetIfExists('some-cmhandle-id') + then: 'the exception from the DB is ignored; there are no exceptions' noExceptionThrown() } def 'Delete Schema Set for CmHandle with other exception' () { - given: 'a CmHandle in the advised state' - def cmHandle = new YangModelCmHandle(id: 'some-cmhandle-id', compositeState: new CompositeState(cmHandleState: CmHandleState.ADVISED)) - and: 'an exception other than SchemaSetNotFoundException is thrown' + given: 'an exception other than SchemaSetNotFoundException is thrown' UnsupportedOperationException unsupportedOperationException = new UnsupportedOperationException(); - 1 * mockCpsModuleService.deleteSchemaSet(_ as String, 'some-cmhandle-id', - CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED) >> { throw unsupportedOperationException } + 1 * mockCpsModuleService.deleteSchemaSet(*_) >> { throw unsupportedOperationException } when: 'delete schema set if exists is called' - objectUnderTest.deleteSchemaSetIfExists(cmHandle) + objectUnderTest.deleteSchemaSetIfExists('some-cmhandle-id') then: 'an exception is thrown' def result = thrown(UnsupportedOperationException) result == unsupportedOperationException diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy new file mode 100644 index 0000000000..291ba968ff --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy @@ -0,0 +1,110 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * Modifications Copyright (C) 2022 Bell Canada + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.inventory.sync + +import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler +import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle +import org.onap.cps.ncmp.api.inventory.CmHandleState +import org.onap.cps.ncmp.api.inventory.CompositeState +import org.onap.cps.ncmp.api.inventory.CompositeStateBuilder +import org.onap.cps.ncmp.api.inventory.InventoryPersistence +import org.onap.cps.ncmp.api.inventory.LockReasonCategory +import org.onap.cps.spi.model.DataNode +import spock.lang.Specification + +class ModuleSyncTasksSpec extends Specification { + + def mockInventoryPersistence = Mock(InventoryPersistence) + + def mockSyncUtils = Mock(SyncUtils) + + def mockModuleSyncService = Mock(ModuleSyncService) + + def mockLcmEventsCmHandleStateHandler = Mock(LcmEventsCmHandleStateHandler) + + def objectUnderTest = new ModuleSyncTasks(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService, mockLcmEventsCmHandleStateHandler) + + def 'Module Sync ADVISED cm handles.'() { + given: 'cm handles in an ADVISED state' + def cmHandle1 = advisedCmHandleAsDataNode('cm-handle-1') + def cmHandle2 = advisedCmHandleAsDataNode('cm-handle-2') + and: 'the inventory persistence cm handle returns a ADVISED state for the any handle' + mockInventoryPersistence.getCmHandleState(_) >> new CompositeState(cmHandleState: CmHandleState.ADVISED) + when: 'module sync poll is executed' + objectUnderTest.performModuleSync([cmHandle1, cmHandle2]) + then: 'module sync service deletes schemas set of each cm handle if it already exists' + 1 * mockModuleSyncService.deleteSchemaSetIfExists('cm-handle-1') + 1 * mockModuleSyncService.deleteSchemaSetIfExists('cm-handle-2') + and: 'module sync service is invoked for each cm handle' + 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args,'cm-handle-1') } + 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args,'cm-handle-2') } + and: 'the state handler is called for the both cm handles' + 2 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.READY) + } + + def 'Module Sync ADVISED cm handle with failure during sync.'() { + given: 'a cm handle in an ADVISED state' + def cmHandle = advisedCmHandleAsDataNode('cm-handle') + and: 'the inventory persistence cm handle returns a ADVISED state for the cm handle' + def cmHandleState = new CompositeState(cmHandleState: CmHandleState.ADVISED) + 1 * mockInventoryPersistence.getCmHandleState('cm-handle') >> cmHandleState + and: 'module sync service attempts to sync the cm handle and throws an exception' + 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(*_) >> { throw new Exception('some exception') } + when: 'module sync is executed' + objectUnderTest.performModuleSync([cmHandle]) + then: 'update lock reason, details and attempts is invoked' + 1 * mockSyncUtils.updateLockReasonDetailsAndAttempts(cmHandleState, LockReasonCategory.LOCKED_MODULE_SYNC_FAILED ,'some exception') + and: 'the state handler is called to update the state to LOCKED' + 1 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.LOCKED) + } + + def 'Reset failed CM Handles #scenario.'() { + given: 'cm handles in an locked state' + def lockedState = new CompositeStateBuilder().withCmHandleState(CmHandleState.LOCKED) + .withLockReason(LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, '').withLastUpdatedTimeNow().build() + def yangModelCmHandle1 = new YangModelCmHandle(id: 'cm-handle-1', compositeState: lockedState) + def yangModelCmHandle2 = new YangModelCmHandle(id: 'cm-handle-2', compositeState: lockedState) + and: 'sync utils retry locked cm handle returns #isReadyForRetry' + mockSyncUtils.isReadyForRetry(lockedState) >>> isReadyForRetry + when: 'resetting failed cm handles' + objectUnderTest.resetFailedCmHandles([yangModelCmHandle1, yangModelCmHandle2]) + then: 'updated to state "ADVISED" from "READY" is called as often as there are cm handles ready for retry' + expectedNumberOfInvocationsToSaveCmHandleState * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.ADVISED) + where: + scenario | isReadyForRetry || expectedNumberOfInvocationsToSaveCmHandleState + 'retry locked cm handle once' | [true, false] || 1 + 'retry locked cm handle twice' | [true, true] || 2 + 'do not retry locked cm handle' | [false, false] || 0 + } + + def advisedCmHandleAsDataNode(cmHandleId) { + return new DataNode(anchorName:cmHandleId, leaves:['id':cmHandleId, 'cm-handle-state':'ADVISED']) + } + + def assertYamgModelCmHandleArgument(args, expectedCmHandleId) { + { + def yangModelCmHandle = args[0] + assert yangModelCmHandle.id == expectedCmHandleId + } + return true + } +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy index 81268cbc0b..43f492dbd7 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy @@ -21,98 +21,56 @@ package org.onap.cps.ncmp.api.inventory.sync - -import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle -import org.onap.cps.ncmp.api.inventory.CmHandleState -import org.onap.cps.ncmp.api.inventory.CompositeState -import org.onap.cps.ncmp.api.inventory.InventoryPersistence -import org.onap.cps.ncmp.api.inventory.LockReasonCategory -import org.onap.cps.ncmp.api.inventory.CompositeStateBuilder -import spock.lang.Specification -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.ConcurrentMap +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import org.onap.cps.spi.model.DataNode +import spock.lang.Specification class ModuleSyncWatchdogSpec extends Specification { - def mockInventoryPersistence = Mock(InventoryPersistence) - def mockSyncUtils = Mock(SyncUtils) - def mockModuleSyncService = Mock(ModuleSyncService) + def static testQueueCapacity = 50 + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE - def stubbedMap = Stub(ConcurrentMap) + BlockingQueue moduleSyncWorkQueue = new ArrayBlockingQueue(testQueueCapacity) - def mockLcmEventsCmHandleStateHandler = Mock(LcmEventsCmHandleStateHandler) + def moduleSyncStartedOnCmHandles = [:] - def cmHandleState = CmHandleState.ADVISED + def mockModuleSyncTasks = Mock(ModuleSyncTasks) - def objectUnderTest = new ModuleSyncWatchdog(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService, stubbedMap as ConcurrentHashMap, mockLcmEventsCmHandleStateHandler) + def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , moduleSyncStartedOnCmHandles, mockModuleSyncTasks) - def 'Schedule a Cm-Handle Sync for ADVISED Cm-Handles'() { - given: 'cm handles in an advised state and a data sync state' - def compositeState1 = new CompositeState(cmHandleState: cmHandleState) - def compositeState2 = new CompositeState(cmHandleState: cmHandleState) - def yangModelCmHandle1 = new YangModelCmHandle(id: 'some-cm-handle', compositeState: compositeState1) - def yangModelCmHandle2 = new YangModelCmHandle(id: 'some-cm-handle-2', compositeState: compositeState2) - and: 'sync utilities return a cm handle twice' - mockSyncUtils.getAdvisedCmHandles() >> [yangModelCmHandle1, yangModelCmHandle2] - when: 'module sync poll is executed' - objectUnderTest.executeAdvisedCmHandlePoll() - then: 'the inventory persistence cm handle returns a composite state for the first cm handle' - 1 * mockInventoryPersistence.getCmHandleState('some-cm-handle') >> compositeState1 - and: 'module sync service deletes schema set of cm handle if it exists' - 1 * mockModuleSyncService.deleteSchemaSetIfExists(yangModelCmHandle1) - and: 'module sync service syncs the first cm handle and creates a schema set' - 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle1) - then: 'the state handler is called for the first cm handle' - 1 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(yangModelCmHandle1, CmHandleState.READY) - and: 'the inventory persistence cm handle returns a composite state for the second cm handle' - mockInventoryPersistence.getCmHandleState('some-cm-handle-2') >> compositeState2 - and: 'module sync service syncs the second cm handle and creates a schema set' - 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle2) - then: 'the state handler is called for the second cm handle' - 1 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(yangModelCmHandle2, CmHandleState.READY) + def 'Module sync #scenario , #numberOfAdvisedCmHandles advised cm handles.'() { + given: 'sync utilities returns #numberOfAdvisedCmHandles advised cm handles' + mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(numberOfAdvisedCmHandles) + when: ' module sync is started' + objectUnderTest.moduleSyncAdvisedCmHandles() + then: 'it performs #expectedNumberOfTaskExecutions tasks' + expectedNumberOfTaskExecutions * mockModuleSyncTasks.performModuleSync(_) + where: + scenario | numberOfAdvisedCmHandles || expectedNumberOfTaskExecutions + 'less then 1 batch' | 1 || 1 + 'exactly 1 batch' | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 1 + '2 batches' | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 2 + 'queue capacity' | testQueueCapacity || 3 + 'over queue capacity' | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3 } - def 'Schedule a Cm-Handle Sync for ADVISED Cm-Handle with failure'() { - given: 'cm handles in an advised state' - def compositeState = new CompositeState(cmHandleState: cmHandleState) - def yangModelCmHandle = new YangModelCmHandle(id: 'some-cm-handle', compositeState: compositeState) - and: 'sync utilities return a cm handle' - mockSyncUtils.getAdvisedCmHandles() >> [yangModelCmHandle] - when: 'module sync poll is executed' - objectUnderTest.executeAdvisedCmHandlePoll() - then: 'the inventory persistence cm handle returns a composite state for the cm handle' - 1 * mockInventoryPersistence.getCmHandleState('some-cm-handle') >> compositeState - and: 'module sync service attempts to sync the cm handle and throws an exception' - 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(*_) >> { throw new Exception('some exception') } - and: 'update lock reason, details and attempts is invoked' - 1 * mockSyncUtils.updateLockReasonDetailsAndAttempts(compositeState, LockReasonCategory.LOCKED_MODULE_SYNC_FAILED ,'some exception') - and: 'the state handler is called to update the state to LOCKED' - 1 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(yangModelCmHandle, CmHandleState.LOCKED) + def 'Reset failed cm handles.'() { + given: 'sync utilities returns failed cm handles' + def failedCmHandles = [new YangModelCmHandle()] + mockSyncUtils.getModuleSyncFailedCmHandles() >> failedCmHandles + when: ' reset failed cm handles is started' + objectUnderTest.resetPreviouslyFailedCmHandles() + then: 'it is delegated to the module sync task (service)' + 1 * mockModuleSyncTasks.resetFailedCmHandles(failedCmHandles) } - def 'Schedule a Cm-Handle Sync with condition #scenario '() { - given: 'cm handles in an locked state' - def compositeState = new CompositeStateBuilder().withCmHandleState(CmHandleState.LOCKED) - .withLockReason(LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, '').withLastUpdatedTimeNow().build() - def yangModelCmHandle = new YangModelCmHandle(id: 'some-cm-handle', compositeState: compositeState) - and: 'sync utilities return a cm handle twice' - mockSyncUtils.getModuleSyncFailedCmHandles() >> [yangModelCmHandle, yangModelCmHandle] - and: 'inventory persistence returns the composite state of the cm handle' - mockInventoryPersistence.getCmHandleState(yangModelCmHandle.getId()) >> compositeState - and: 'sync utils retry locked cm handle returns #isReadyForRetry' - mockSyncUtils.isReadyForRetry(compositeState) >>> isReadyForRetry - when: 'module sync poll is executed' - objectUnderTest.executeLockedCmHandlePoll() - then: 'the first cm handle is updated to state "ADVISED" from "READY"' - expectedNumberOfInvocationsToSaveCmHandleState * mockLcmEventsCmHandleStateHandler.updateCmHandleState(yangModelCmHandle, CmHandleState.ADVISED) - where: - scenario | isReadyForRetry || expectedNumberOfInvocationsToSaveCmHandleState - 'retry locked cm handle once' | [true, false] || 1 - 'retry locked cm handle twice' | [true, true] || 2 - 'do not retry locked cm handle' | [false, false] || 0 + def createDataNodes(numberOfDataNodes) { + def dataNodes = [] + (1..numberOfDataNodes).each {dataNodes.add(new DataNode())} + return dataNodes } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy index 52fb110b33..6ccdcf12d3 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy @@ -47,8 +47,6 @@ import java.util.stream.Collectors class SyncUtilsSpec extends Specification{ - def mockInventoryPersistence = Mock(InventoryPersistence) - def mockCmHandleQueries = Mock(CmHandleQueries) def mockDmiDataOperations = Mock(DmiDataOperations) @@ -63,28 +61,14 @@ class SyncUtilsSpec extends Specification{ @Shared def dataNode = new DataNode(leaves: ['id': 'cm-handle-123']) - @Shared - def dataNodeAdditionalProperties = new DataNode(leaves: ['name': 'dmiProp1', 'value': 'dmiValue1']) - def 'Get an advised Cm-Handle where ADVISED cm handle #scenario'() { given: 'the inventory persistence service returns a collection of data nodes' mockCmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED) >> dataNodeCollection - and: 'we have some additional (dmi, private) properties' - dataNodeAdditionalProperties.xpath = dataNode.xpath + '/additional-properties[@name="dmiProp1"]' - dataNode.childDataNodes = [dataNodeAdditionalProperties] when: 'get advised cm handles are fetched' def yangModelCmHandles = objectUnderTest.getAdvisedCmHandles() then: 'the returned data node collection is the correct size' yangModelCmHandles.size() == expectedDataNodeSize - and: 'if there is a data node the additional (dmi, private) properties are included' - if (expectedDataNodeSize > 0) { - assert yangModelCmHandles[0].dmiProperties[0].name == 'dmiProp1' - assert yangModelCmHandles[0].dmiProperties[0].value == 'dmiValue1' - } - and: 'yang model collection contains the correct data' - yangModelCmHandles.stream().map(yangModel -> yangModel.id).collect(Collectors.toSet()) == - dataNodeCollection.stream().map(dataNode -> dataNode.leaves.get("id")).collect(Collectors.toSet()) where: 'the following scenarios are used' scenario | dataNodeCollection || expectedCallsToGetYangModelCmHandle | expectedDataNodeSize 'exists' | [dataNode] || 1 | 1 diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/YangDataConverterSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/YangDataConverterSpec.groovy new file mode 100644 index 0000000000..20d384fa53 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/YangDataConverterSpec.groovy @@ -0,0 +1,42 @@ +/* + * ============LICENSE_START======================================================== + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.utils + +import org.onap.cps.ncmp.api.impl.utils.YangDataConverter +import org.onap.cps.spi.model.DataNode +import spock.lang.Specification + +class YangDataConverterSpec extends Specification{ + + def 'Convert a cm handle data node with private properties.'() { + given: 'a datanode with some additional (dmi, private) properties' + def dataNodeAdditionalProperties = new DataNode(xpath:'/additional-properties[@name="dmiProp1"]', + leaves: ['name': 'dmiProp1', 'value': 'dmiValue1']) + def dataNode = new DataNode(childDataNodes:[dataNodeAdditionalProperties]) + when: 'the dataNode is converted' + def yangModelCmHandle = YangDataConverter.convertCmHandleToYangModel(dataNode,'sample-id') + then: 'the converted object has the correct id' + assert yangModelCmHandle.id == 'sample-id' + and: 'the additional (dmi, private) properties are included' + assert yangModelCmHandle.dmiProperties[0].name == 'dmiProp1' + assert yangModelCmHandle.dmiProperties[0].value == 'dmiValue1' + } +} diff --git a/cps-service/src/main/java/org/onap/cps/spi/model/DataNode.java b/cps-service/src/main/java/org/onap/cps/spi/model/DataNode.java index d80306bae8..c77daafb38 100644 --- a/cps-service/src/main/java/org/onap/cps/spi/model/DataNode.java +++ b/cps-service/src/main/java/org/onap/cps/spi/model/DataNode.java @@ -22,6 +22,7 @@ package org.onap.cps.spi.model; +import java.io.Serializable; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -33,7 +34,9 @@ import lombok.Setter; @Setter(AccessLevel.PROTECTED) @Getter @EqualsAndHashCode -public class DataNode { +public class DataNode implements Serializable { + + private static final long serialVersionUID = 1482619410918597467L; DataNode() { } -- cgit 1.2.3-korg