diff options
Diffstat (limited to 'cps-ncmp-service')
14 files changed, 202 insertions, 88 deletions
diff --git a/cps-ncmp-service/pom.xml b/cps-ncmp-service/pom.xml index 9ce9d51363..fda4221f6f 100644 --- a/cps-ncmp-service/pom.xml +++ b/cps-ncmp-service/pom.xml @@ -27,7 +27,7 @@ <parent> <groupId>org.onap.cps</groupId> <artifactId>cps-parent</artifactId> - <version>3.5.4-SNAPSHOT</version> + <version>3.5.5-SNAPSHOT</version> <relativePath>../cps-parent/pom.xml</relativePath> </parent> @@ -74,10 +74,6 @@ <artifactId>cps-path-parser</artifactId> </dependency> <dependency> - <groupId>com.google.code.findbugs</groupId> - <artifactId>annotations</artifactId> - </dependency> - <dependency> <groupId>com.hazelcast</groupId> <artifactId>hazelcast-spring</artifactId> </dependency> diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/NetworkCmProxyInventoryFacade.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/NetworkCmProxyInventoryFacade.java index 5331c13bd1..e9e5f5499f 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/NetworkCmProxyInventoryFacade.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/NetworkCmProxyInventoryFacade.java @@ -62,15 +62,16 @@ public class NetworkCmProxyInventoryFacade { private final TrustLevelManager trustLevelManager; private final AlternateIdMatcher alternateIdMatcher; + + /** * Registration of Created, Removed, Updated or Upgraded CM Handles. * * @param dmiPluginRegistration Dmi Plugin Registration details * @return dmiPluginRegistrationResponse */ - public DmiPluginRegistrationResponse updateDmiRegistrationAndSyncModule( - final DmiPluginRegistration dmiPluginRegistration) { - return cmHandleRegistrationService.updateDmiRegistrationAndSyncModule(dmiPluginRegistration); + public DmiPluginRegistrationResponse updateDmiRegistration(final DmiPluginRegistration dmiPluginRegistration) { + return cmHandleRegistrationService.updateDmiRegistration(dmiPluginRegistration); } /** diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java index ad8025b5dc..109a541cb3 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java @@ -57,13 +57,13 @@ public class HazelcastCacheConfig { final Config config = getHazelcastInstanceConfig(instanceConfigName); config.setClusterName(clusterName); config.setClassLoader(org.onap.cps.spi.model.Dataspace.class.getClassLoader()); - dataStructuresConfig(namedConfig, config); + configureDataStructures(namedConfig, config); exposeClusterInformation(config); updateDiscoveryMode(config); return config; } - private static void dataStructuresConfig(final NamedConfig namedConfig, final Config config) { + private static void configureDataStructures(final NamedConfig namedConfig, final Config config) { if (namedConfig instanceof MapConfig) { config.addMapConfig((MapConfig) namedConfig); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationService.java index d9f7e38993..cb55b09d41 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationService.java @@ -87,8 +87,7 @@ public class CmHandleRegistrationService { * @param dmiPluginRegistration Dmi Plugin Registration details * @return dmiPluginRegistrationResponse */ - public DmiPluginRegistrationResponse updateDmiRegistrationAndSyncModule( - final DmiPluginRegistration dmiPluginRegistration) { + public DmiPluginRegistrationResponse updateDmiRegistration(final DmiPluginRegistration dmiPluginRegistration) { dmiPluginRegistration.validateDmiPluginRegistration(); final DmiPluginRegistrationResponse dmiPluginRegistrationResponse = new DmiPluginRegistrationResponse(); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java index c6deb79d4d..e627f8f894 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java @@ -95,22 +95,21 @@ public class ModuleSyncTasks { } /** - * Resets the state of failed CM handles and updates their status to ADVISED for retry. - - * This method processes a collection of failed CM handles, logs their lock reason, and resets their state + * Set the state of CM handles to ADVISED. + * This method processes a collection of CM handles, logs their lock reason, and resets their state * to ADVISED. Once reset, it updates the CM handle states in a batch to allow for re-attempt by the module-sync * watchdog. * - * @param failedCmHandles a collection of CM handles that have failed and need their state reset + * @param yangModelCmHandles a collection of CM handles that needs their state reset */ - public void resetFailedCmHandles(final Collection<YangModelCmHandle> failedCmHandles) { - final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(failedCmHandles.size()); - for (final YangModelCmHandle failedCmHandle : failedCmHandles) { - final CompositeState compositeState = failedCmHandle.getCompositeState(); - final String resetCmHandleId = failedCmHandle.getId(); + public void setCmHandlesToAdvised(final Collection<YangModelCmHandle> yangModelCmHandles) { + final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(yangModelCmHandles.size()); + for (final YangModelCmHandle yangModelCmHandle : yangModelCmHandles) { + final CompositeState compositeState = yangModelCmHandle.getCompositeState(); + final String resetCmHandleId = yangModelCmHandle.getId(); log.debug("Resetting CM handle {} state to ADVISED for retry by the module-sync watchdog. Lock reason: {}", - failedCmHandle.getId(), compositeState.getLockReason().getLockReasonCategory().name()); - cmHandleStatePerCmHandle.put(failedCmHandle, CmHandleState.ADVISED); + yangModelCmHandle.getId(), compositeState.getLockReason().getLockReasonCategory().name()); + cmHandleStatePerCmHandle.put(yangModelCmHandle, CmHandleState.ADVISED); removeResetCmHandleFromModuleSyncMap(resetCmHandleId); } lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandleStatePerCmHandle); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java index bc7d6cdf67..898b8d5bf4 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java @@ -27,10 +27,12 @@ import java.util.HashSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle; +import org.onap.cps.ncmp.impl.utils.Sleeper; import org.onap.cps.spi.model.DataNode; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @@ -45,6 +47,9 @@ public class ModuleSyncWatchdog { private final IMap<String, Object> moduleSyncStartedOnCmHandles; private final ModuleSyncTasks moduleSyncTasks; private final AsyncTaskExecutor asyncTaskExecutor; + private final Lock workQueueLock; + private final Sleeper sleeper; + private static final int MODULE_SYNC_BATCH_SIZE = 100; private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10; private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started"; @@ -56,11 +61,12 @@ public class ModuleSyncWatchdog { * Check DB for any cm handles in 'ADVISED' state. * Queue and create batches to process them asynchronously. * This method will only finish when there are no more 'ADVISED' cm handles in the DB. - * This method wil be triggered on a configurable interval + * This method is triggered on a configurable interval (ncmp.timers.advised-modules-sync.sleep-time-ms) */ - @Scheduled(fixedDelayString = "${ncmp.timers.advised-modules-sync.sleep-time-ms:5000}") + @Scheduled(initialDelayString = "${test.ncmp.timers.advised-modules-sync.initial-delay-ms:0}", + fixedDelayString = "${ncmp.timers.advised-modules-sync.sleep-time-ms:5000}") public void moduleSyncAdvisedCmHandles() { - log.info("Processing module sync watchdog waking up."); + log.debug("Processing module sync watchdog waking up."); populateWorkQueueIfNeeded(); while (!moduleSyncWorkQueue.isEmpty()) { if (batchCounter.get() <= asyncTaskExecutor.getAsyncTaskParallelismLevel()) { @@ -80,36 +86,50 @@ public class ModuleSyncWatchdog { } /** - * Find any failed (locked) cm handles and change state back to 'ADVISED'. + * Populate work queue with advised cm handles from db. + * This method is made public for (integration) testing purposes. + * So it can be tested without the queue being emptied immediately as the main public method does. */ - @Scheduled(fixedDelayString = "${ncmp.timers.locked-modules-sync.sleep-time-ms:15000}") - public void resetPreviouslyFailedCmHandles() { - log.info("Processing module sync retry-watchdog waking up."); - final Collection<YangModelCmHandle> failedCmHandles - = moduleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade(); - log.info("Retrying {} cmHandles", failedCmHandles.size()); - moduleSyncTasks.resetFailedCmHandles(failedCmHandles); + public void populateWorkQueueIfNeeded() { + if (moduleSyncWorkQueue.isEmpty() && workQueueLock.tryLock()) { + try { + populateWorkQueue(); + if (moduleSyncWorkQueue.isEmpty()) { + setPreviouslyLockedCmHandlesToAdvised(); + } + } finally { + workQueueLock.unlock(); + } + } } - private void preventBusyWait() { - try { - log.info("Busy waiting now"); - TimeUnit.MILLISECONDS.sleep(PREVENT_CPU_BURN_WAIT_TIME_MILLIS); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); + private void populateWorkQueue() { + final Collection<DataNode> advisedCmHandles = moduleOperationsUtils.getAdvisedCmHandles(); + if (advisedCmHandles.isEmpty()) { + log.debug("No advised CM handles found in DB."); + } else { + log.info("Fetched {} advised CM handles from DB. Adding them to the work queue.", advisedCmHandles.size()); + advisedCmHandles.forEach(advisedCmHandle -> { + final String cmHandleId = String.valueOf(advisedCmHandle.getLeaves().get("id")); + if (moduleSyncWorkQueue.offer(advisedCmHandle)) { + log.info("CM handle {} added to the work queue.", cmHandleId); + } else { + log.warn("Failed to add CM handle {} to the work queue.", cmHandleId); + } + }); + log.info("Work queue contains {} items.", moduleSyncWorkQueue.size()); } } - private void populateWorkQueueIfNeeded() { - if (moduleSyncWorkQueue.isEmpty()) { - final Collection<DataNode> advisedCmHandles = moduleOperationsUtils.getAdvisedCmHandles(); - log.info("Processing module sync fetched {} advised cm handles from DB", advisedCmHandles.size()); - for (final DataNode advisedCmHandle : advisedCmHandles) { - if (!moduleSyncWorkQueue.offer(advisedCmHandle)) { - log.warn("Unable to add cm handle {} to the work queue", advisedCmHandle.getLeaves().get("id")); - } - } - log.info("Work Queue Size : {}", moduleSyncWorkQueue.size()); + private void setPreviouslyLockedCmHandlesToAdvised() { + final Collection<YangModelCmHandle> lockedCmHandles + = moduleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade(); + if (lockedCmHandles.isEmpty()) { + log.debug("No locked CM handles found in DB."); + } else { + log.info("Found {} Locked CM Handles. Changing state to Advise to retry syncing them again.", + lockedCmHandles.size()); + moduleSyncTasks.setCmHandlesToAdvised(lockedCmHandles); } } @@ -130,8 +150,16 @@ public class ModuleSyncWatchdog { nextBatch.add(batchCandidate); } } - log.debug("nextBatch size : {}", nextBatch.size()); + log.info("nextBatch size : {}", nextBatch.size()); return nextBatch; } + private void preventBusyWait() { + try { + log.debug("Busy waiting now"); + sleeper.haveALittleRest(PREVENT_CPU_BURN_WAIT_TIME_MILLIS); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java index c5fae0d166..1f33cc349d 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java @@ -24,6 +24,7 @@ import com.hazelcast.config.MapConfig; import com.hazelcast.config.QueueConfig; import com.hazelcast.map.IMap; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.locks.Lock; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.impl.cache.HazelcastCacheConfig; import org.onap.cps.spi.model.DataNode; @@ -43,6 +44,7 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig { private static final QueueConfig commonQueueConfig = createQueueConfig("defaultQueueConfig"); private static final MapConfig moduleSyncStartedConfig = createMapConfig("moduleSyncStartedConfig"); private static final MapConfig dataSyncSemaphoresConfig = createMapConfig("dataSyncSemaphoresConfig"); + private static final String LOCK_NAME_FOR_WORK_QUEUE = "workQueueLock"; /** * Module Sync Distributed Queue Instance. @@ -74,4 +76,21 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig { public IMap<String, Boolean> dataSyncSemaphores() { return getOrCreateHazelcastInstance(dataSyncSemaphoresConfig).getMap("dataSyncSemaphores"); } + + /** + * Retrieves a distributed lock used to control access to the work queue for module synchronization. + * This lock ensures that the population and modification of the work queue are thread-safe and + * protected from concurrent access across different nodes in the distributed system. + * The lock guarantees that only one instance of the application can populate or modify the + * module sync work queue at a time, preventing race conditions and potential data inconsistencies. + * The lock is obtained using the Hazelcast CP Subsystem's {@link Lock}, which provides + * strong consistency guarantees for distributed operations. + * + * @return a {@link Lock} instance used for synchronizing access to the work queue. + */ + @Bean + public Lock workQueueLock() { + // TODO Method below does not use commonQueueConfig for creating lock (Refactor later) + return getOrCreateHazelcastInstance(commonQueueConfig).getCPSubsystem().getLock(LOCK_NAME_FOR_WORK_QUEUE); + } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java new file mode 100644 index 0000000000..7a02fa06e0 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.utils; + +import java.util.concurrent.TimeUnit; +import org.springframework.stereotype.Service; + +/** + * This class is to extract out sleep functionality so the interrupted exception handling can + * be covered with a test (e.g. using spy on Sleeper) and help to get to 100% code coverage. + */ +@Service +public class Sleeper { + public void haveALittleRest(final long timeInMillis) throws InterruptedException { + TimeUnit.MILLISECONDS.sleep(timeInMillis); + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/http/WebClientConfiguration.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/http/WebClientConfiguration.java index d8e8350345..8ae942eb7b 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/http/WebClientConfiguration.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/http/WebClientConfiguration.java @@ -20,7 +20,6 @@ package org.onap.cps.ncmp.impl.utils.http; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.netty.channel.ChannelOption; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.WriteTimeoutHandler; @@ -63,7 +62,6 @@ public class WebClientConfiguration { .compress(true); } - @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE") private static ConnectionProvider getConnectionProvider(final ServiceConfig serviceConfig) { return ConnectionProvider.builder(serviceConfig.getConnectionProviderName()) .maxConnections(serviceConfig.getMaximumConnectionsTotal()) diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfigSpec.groovy index dc38e0fc9b..0bd838437d 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfigSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfigSpec.groovy @@ -29,10 +29,10 @@ class HazelcastCacheConfigSpec extends Specification { def objectUnderTest = new HazelcastCacheConfig() def 'Create Hazelcast instance with a #scenario'() { - given: 'a cluster name and instance name' + given: 'a cluster name and instance config name' objectUnderTest.clusterName = 'my cluster' objectUnderTest.instanceConfigName = 'my instance config' - when: 'an hazelcast instance is created (name has to be unique)' + when: 'a hazelcast instance is created (name has to be unique)' def result = objectUnderTest.getOrCreateHazelcastInstance(config) then: 'the instance is created and has the correct name' assert result.name == 'my instance config' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServiceSpec.groovy index dcff2e9b89..70e26d993c 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServiceSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServiceSpec.groovy @@ -87,7 +87,7 @@ class CmHandleRegistrationServiceSpec extends Specification { and: 'cm handle is in READY state' mockCmHandleQueries.cmHandleHasState('cmhandle-3', CmHandleState.READY) >> true when: 'registration is processed' - objectUnderTest.updateDmiRegistrationAndSyncModule(dmiRegistration) + objectUnderTest.updateDmiRegistration(dmiRegistration) then: 'cm-handles are removed first' 1 * objectUnderTest.processRemovedCmHandles(*_) and: 'de-registered cm handle entry is removed from in progress map' @@ -108,7 +108,7 @@ class CmHandleRegistrationServiceSpec extends Specification { and: 'exception while checking cm handle state' mockInventoryPersistence.getYangModelCmHandle('cmhandle-3') >> new YangModelCmHandle(id: 'cmhandle-3', moduleSetTag: '', compositeState: new CompositeState(cmHandleState: cmHandleState)) when: 'registration is processed' - def result = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiRegistration) + def result = objectUnderTest.updateDmiRegistration(dmiRegistration) then: 'upgrade operation contains expected error code' assert result.upgradedCmHandles[0].status == expectedResponseStatus where: 'the following parameters are used' @@ -124,7 +124,7 @@ class CmHandleRegistrationServiceSpec extends Specification { and: 'exception while checking cm handle state' mockInventoryPersistence.getYangModelCmHandle('cmhandle-3') >> { throw exception } when: 'registration is processed' - def result = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiRegistration) + def result = objectUnderTest.updateDmiRegistration(dmiRegistration) then: 'upgrade operation contains expected error code' assert result.upgradedCmHandles.ncmpResponseStatus.code[0] == expectedErrorCode where: 'the following parameters are used' @@ -139,7 +139,7 @@ class CmHandleRegistrationServiceSpec extends Specification { dmiDataPlugin: dmiDataPlugin) dmiPluginRegistration.createdCmHandles = [ncmpServiceCmHandle] when: 'update registration and sync module is called with correct DMI plugin information' - objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration) + objectUnderTest.updateDmiRegistration(dmiPluginRegistration) then: 'create cm handles registration and sync modules is called with the correct plugin information' 1 * objectUnderTest.processCreatedCmHandles(dmiPluginRegistration, _) where: @@ -155,7 +155,7 @@ class CmHandleRegistrationServiceSpec extends Specification { dmiDataPlugin: dmiDataPlugin) dmiPluginRegistration.createdCmHandles = [ncmpServiceCmHandle] when: 'registration is called with incorrect DMI plugin information' - objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration) + objectUnderTest.updateDmiRegistration(dmiPluginRegistration) then: 'a DMI Request Exception is thrown with correct message details' def exceptionThrown = thrown(DmiRequestException.class) assert exceptionThrown.getMessage().contains(expectedMessageDetails) @@ -178,7 +178,7 @@ class CmHandleRegistrationServiceSpec extends Specification { def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: 'my-server') dmiPluginRegistration.createdCmHandles = [new NcmpServiceCmHandle(cmHandleId: 'cmhandle', dmiProperties: dmiProperties, publicProperties: publicProperties)] when: 'registration is updated' - def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration) + def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration) then: 'a successful response is received' response.createdCmHandles.size() == 1 with(response.createdCmHandles[0]) { @@ -206,7 +206,7 @@ class CmHandleRegistrationServiceSpec extends Specification { def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: 'my-server', createdCmHandles:[new NcmpServiceCmHandle(cmHandleId: 'ch-1', registrationTrustLevel: registrationTrustLevel)]) when: 'registration is updated' - objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration) + objectUnderTest.updateDmiRegistration(dmiPluginRegistration) then: 'trustLevel is set for the created cm-handle' 1 * mockTrustLevelManager.registerCmHandles(expectedMapping) where: @@ -225,7 +225,7 @@ class CmHandleRegistrationServiceSpec extends Specification { def xpath = "somePathWithId[@id='cmhandle2']" mockLcmEventsCmHandleStateHandler.initiateStateAdvised(*_) >> { throw AlreadyDefinedException.forDataNodes([xpath], 'some-context') } when: 'registration is updated to create cm-handles' - def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration) + def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration) then: 'a response is received for all cm-handles' response.createdCmHandles.size() == 1 and: 'all cm-handles creation fails' @@ -244,7 +244,7 @@ class CmHandleRegistrationServiceSpec extends Specification { and: 'cm-handler registration fails: #scenario' mockLcmEventsCmHandleStateHandler.initiateStateAdvised(*_) >> { throw exception } when: 'registration is updated' - def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration) + def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration) then: 'a failure response is received' response.createdCmHandles.size() == 1 with(response.createdCmHandles[0]) { @@ -269,7 +269,7 @@ class CmHandleRegistrationServiceSpec extends Specification { CmHandleRegistrationResponse.createFailureResponse('cm handle 4', CM_HANDLE_INVALID_ID)] mockNetworkCmProxyDataServicePropertyHandler.updateCmHandleProperties(_) >> updateOperationResponse when: 'registration is updated' - def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration) + def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration) then: 'the response contains updateOperationResponse' assert response.updatedCmHandles.size() == 4 assert response.updatedCmHandles.containsAll(updateOperationResponse) @@ -281,7 +281,7 @@ class CmHandleRegistrationServiceSpec extends Specification { and: '#scenario' mockCpsModuleService.deleteSchemaSetsWithCascade(_, ['cmhandle']) >> { if (!schemaSetExist) { throw new SchemaSetNotFoundException('', '') } } when: 'registration is updated to delete cmhandle' - def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration) + def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration) then: 'the cmHandle state is updated to "DELETING"' 1 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch(_) >> { args -> args[0].values()[0] == CmHandleState.DELETING } @@ -315,7 +315,7 @@ class CmHandleRegistrationServiceSpec extends Specification { and: 'cm-handle deletion is successful for 1st and 3rd; failed for 2nd' mockInventoryPersistence.deleteDataNode("/dmi-registry/cm-handles[@id='cmhandle2']") >> { throw new RuntimeException("Failed") } when: 'registration is updated to delete cmhandles' - def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration) + def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration) then: 'the cmHandle states are all updated to "DELETING"' 1 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch({ assert it.every { entry -> entry.value == CmHandleState.DELETING } }) and: 'a response is received for all cm-handles' @@ -361,7 +361,7 @@ class CmHandleRegistrationServiceSpec extends Specification { and: 'schema set single deletion failed with unknown error' mockInventoryPersistence.deleteSchemaSetWithCascade(_) >> { throw new RuntimeException('Failed') } when: 'registration is updated to delete cmhandle' - def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration) + def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration) then: 'no exception is thrown' noExceptionThrown() and: 'cm-handle is not deleted' @@ -387,7 +387,7 @@ class CmHandleRegistrationServiceSpec extends Specification { and: 'cm-handle deletion fails on individual delete' mockInventoryPersistence.deleteDataNode(_) >> { throw deleteListElementException } when: 'registration is updated to delete cmhandle' - def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration) + def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration) then: 'a failure response is received' assert response.removedCmHandles.size() == 1 with(response.removedCmHandles[0]) { diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/NetworkCmProxyInventoryFacadeSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/NetworkCmProxyInventoryFacadeSpec.groovy index 42f0a08ac3..4c554c6af5 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/NetworkCmProxyInventoryFacadeSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/NetworkCmProxyInventoryFacadeSpec.groovy @@ -56,9 +56,9 @@ class NetworkCmProxyInventoryFacadeSpec extends Specification { given: 'an (updated) dmi plugin registration' def dmiPluginRegistration = Mock(DmiPluginRegistration) when: 'the registration is submitted ' - objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration) + objectUnderTest.updateDmiRegistration(dmiPluginRegistration) then: 'the call is delegated to the cm handle registration service' - 1 * mockCmHandleRegistrationService.updateDmiRegistrationAndSyncModule(dmiPluginRegistration) + 1 * mockCmHandleRegistrationService.updateDmiRegistration(dmiPluginRegistration) } def 'Execute cm handle reference search for inventory'() { diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy index 160744a7d7..4d715d28c9 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy @@ -136,7 +136,7 @@ class ModuleSyncTasksSpec extends Specification { moduleSyncStartedOnCmHandles.put('cm-handle-1', 'started') moduleSyncStartedOnCmHandles.put('cm-handle-2', 'started') when: 'resetting failed cm handles' - objectUnderTest.resetFailedCmHandles([yangModelCmHandle1, yangModelCmHandle2]) + objectUnderTest.setCmHandlesToAdvised([yangModelCmHandle1, yangModelCmHandle2]) then: 'updated to state "ADVISED" from "READY" is called as often as there are cm handles ready for retry' 1 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch(expectedCmHandleStatePerCmHandle) and: 'after reset performed progress map is empty' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy index 155edc8bc6..f2c88a511e 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy @@ -23,14 +23,16 @@ package org.onap.cps.ncmp.impl.inventory.sync import com.hazelcast.map.IMap import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle +import org.onap.cps.ncmp.impl.utils.Sleeper import org.onap.cps.spi.model.DataNode import spock.lang.Specification import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.locks.Lock class ModuleSyncWatchdogSpec extends Specification { - def mockSyncUtils = Mock(ModuleOperationsUtils) + def mockModuleOperationsUtils = Mock(ModuleOperationsUtils) def static testQueueCapacity = 50 + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE @@ -42,15 +44,23 @@ class ModuleSyncWatchdogSpec extends Specification { def spiedAsyncTaskExecutor = Spy(AsyncTaskExecutor) - def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor) + def mockWorkQueueLock = Mock(Lock) + + def spiedSleeper = Spy(Sleeper) + + def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor, mockWorkQueueLock, spiedSleeper) void setup() { spiedAsyncTaskExecutor.setupThreadPool() } def 'Module sync advised cm handles with #scenario.'() { - given: 'sync utilities returns #numberOfAdvisedCmHandles advised cm handles' - mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(numberOfAdvisedCmHandles) + given: 'module sync utilities returns #numberOfAdvisedCmHandles advised cm handles' + mockModuleOperationsUtils.getAdvisedCmHandles() >> createDataNodes(numberOfAdvisedCmHandles) + and: 'module sync utilities returns no failed (locked) cm handles' + mockModuleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> [] + and: 'the work queue is not locked' + mockWorkQueueLock.tryLock() >> true and: 'the executor has enough available threads' spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 3 when: ' module sync is started' @@ -59,6 +69,7 @@ class ModuleSyncWatchdogSpec extends Specification { expectedNumberOfTaskExecutions * spiedAsyncTaskExecutor.executeTask(*_) where: 'the following parameter are used' scenario | numberOfAdvisedCmHandles || expectedNumberOfTaskExecutions + 'none at all' | 0 || 0 'less then 1 batch' | 1 || 1 'exactly 1 batch' | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 1 '2 batches' | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 2 @@ -66,9 +77,11 @@ class ModuleSyncWatchdogSpec extends Specification { 'over queue capacity' | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3 } - def 'Module sync advised cm handles starts with no available threads.'() { - given: 'sync utilities returns a advise cm handles' - mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(1) + def 'Module sync cm handles starts with no available threads.'() { + given: 'module sync utilities returns a advise cm handles' + mockModuleOperationsUtils.getAdvisedCmHandles() >> createDataNodes(1) + and: 'the work queue is not locked' + mockWorkQueueLock.tryLock() >> true and: 'the executor first has no threads but has one thread on the second attempt' spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >>> [ 0, 1 ] when: ' module sync is started' @@ -77,9 +90,11 @@ class ModuleSyncWatchdogSpec extends Specification { 1 * spiedAsyncTaskExecutor.executeTask(*_) } - def 'Module sync advised cm handles already handled.'() { - given: 'sync utilities returns a advise cm handles' - mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(1) + def 'Module sync advised cm handle already handled by other thread.'() { + given: 'module sync utilities returns an advised cm handle' + mockModuleOperationsUtils.getAdvisedCmHandles() >> createDataNodes(1) + and: 'the work queue is not locked' + mockWorkQueueLock.tryLock() >> true and: 'the executor has a thread available' spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 1 and: 'the semaphore cache indicates the cm handle is already being processed' @@ -94,7 +109,7 @@ class ModuleSyncWatchdogSpec extends Specification { given: 'there is still a cm handle in the queue' moduleSyncWorkQueue.offer(new DataNode()) and: 'sync utilities returns many advise cm handles' - mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(500) + mockModuleOperationsUtils.getAdvisedCmHandles() >> createDataNodes(500) and: 'the executor has plenty threads available' spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 10 when: ' module sync is started' @@ -104,18 +119,42 @@ class ModuleSyncWatchdogSpec extends Specification { } def 'Reset failed cm handles.'() { - given: 'sync utilities returns failed cm handles' + given: 'module sync utilities returns failed cm handles' def failedCmHandles = [new YangModelCmHandle()] - mockSyncUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> failedCmHandles + mockModuleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> failedCmHandles when: 'reset failed cm handles is started' - objectUnderTest.resetPreviouslyFailedCmHandles() + objectUnderTest.setPreviouslyLockedCmHandlesToAdvised() then: 'it is delegated to the module sync task (service)' - 1 * mockModuleSyncTasks.resetFailedCmHandles(failedCmHandles) + 1 * mockModuleSyncTasks.setCmHandlesToAdvised(failedCmHandles) + } + + def 'Module Sync Locking.'() { + given: 'module sync utilities returns an advised cm handle' + mockModuleOperationsUtils.getAdvisedCmHandles() >> createDataNodes(1) + and: 'can lock is : #canLock' + mockWorkQueueLock.tryLock() >> canLock + when: 'attempt to populate the work queue' + objectUnderTest.populateWorkQueueIfNeeded() + then: 'the queue remains empty is #expectQueueRemainsEmpty' + assert moduleSyncWorkQueue.isEmpty() == expectQueueRemainsEmpty + where: 'the following lock states are applied' + canLock | expectQueueRemainsEmpty + false | true + true | false + } + + def 'Sleeper gets interrupted.'() { + given: 'sleeper gets interrupted' + spiedSleeper.haveALittleRest(_) >> { throw new InterruptedException() } + when: 'the watchdog attempts to sleep to save cpu cycles' + objectUnderTest.preventBusyWait() + then: 'no exception is thrown' + noExceptionThrown() } def createDataNodes(numberOfDataNodes) { def dataNodes = [] - (1..numberOfDataNodes).each {dataNodes.add(new DataNode())} + numberOfDataNodes.times { dataNodes.add(new DataNode()) } return dataNodes } } |