diff options
author | 2025-01-23 12:34:40 +0000 | |
---|---|---|
committer | 2025-01-23 15:34:05 +0000 | |
commit | 0cb3dec31d5a42ef00e89038aef1625179c8ac00 (patch) | |
tree | ef7028063f090712a3a0692380aecdcf2deb095e | |
parent | ed797d4a340ed145c71ae2cd805b823b26228304 (diff) |
Fix for cm handles stuck in LOCKED during registration
- Additional Error logging when cm handles fail module sync
- Swallow already defined exception upon schema and/or anchor creation
- Updated integration test to try to reproduce the problem (but couldn't)
- Ignored integration tests that depend/affected by race conditions
(they are useful for troubleshooting but not for pipeline checks)
- Removed last remnants of springboot retry annotation option (incl dependencies)
Issue-ID: CPS-2576
Change-Id: I910e802268332f955134c043bd1b46a7ec57233b
Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
8 files changed, 72 insertions, 47 deletions
diff --git a/cps-application/src/main/java/org/onap/cps/Application.java b/cps-application/src/main/java/org/onap/cps/Application.java index 053139fcc8..62103bf368 100644 --- a/cps-application/src/main/java/org/onap/cps/Application.java +++ b/cps-application/src/main/java/org/onap/cps/Application.java @@ -22,9 +22,7 @@ package org.onap.cps; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.retry.annotation.EnableRetry;
-@EnableRetry
@SpringBootApplication
public class Application {
public static void main(final String[] args) {
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncService.java index 041daa0927..6c1dc731c6 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncService.java @@ -35,6 +35,8 @@ import lombok.extern.slf4j.Slf4j; import org.onap.cps.api.CpsAnchorService; import org.onap.cps.api.CpsDataService; import org.onap.cps.api.CpsModuleService; +import org.onap.cps.api.exceptions.AlreadyDefinedException; +import org.onap.cps.api.exceptions.DuplicatedYangResourceException; import org.onap.cps.api.model.ModuleReference; import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle; import org.onap.cps.utils.ContentType; @@ -68,7 +70,12 @@ public class ModuleSyncService { final String moduleSetTag = yangModelCmHandle.getModuleSetTag(); final String schemaSetName = getSchemaSetName(cmHandleId, moduleSetTag); syncAndCreateSchemaSet(yangModelCmHandle, schemaSetName); - cpsAnchorService.createAnchor(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, schemaSetName, cmHandleId); + try { + cpsAnchorService.createAnchor(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, schemaSetName, cmHandleId); + } catch (final AlreadyDefinedException alreadyDefinedException) { + log.warn("Ignoring (anchor) already exist exception for {}. Exception details: ", + yangModelCmHandle.getId(), alreadyDefinedException); + } } /** @@ -99,14 +106,27 @@ public class ModuleSyncService { private void syncAndCreateSchemaSet(final YangModelCmHandle yangModelCmHandle, final String schemaSetName) { if (isNewSchemaSet(schemaSetName)) { final ModuleDelta moduleDelta = getModuleDelta(yangModelCmHandle); - log.info("Creating Schema Set {} for CM Handle {}", schemaSetName, yangModelCmHandle.getId()); - cpsModuleService.createSchemaSetFromModules( + try { + log.info("Creating Schema Set {} for CM Handle {}", schemaSetName, yangModelCmHandle.getId()); + cpsModuleService.createSchemaSetFromModules( NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, schemaSetName, moduleDelta.newModuleNameToContentMap, moduleDelta.allModuleReferences - ); - log.info("Successfully created Schema Set {} for CM Handle {}", schemaSetName, yangModelCmHandle.getId()); + ); + log.info("Successfully created Schema Set {} for CM Handle {}", schemaSetName, + yangModelCmHandle.getId()); + } catch (final AlreadyDefinedException alreadyDefinedException) { + log.warn("Ignoring already exist (schema set) exception for {}. Exception details: ", + yangModelCmHandle.getId(), alreadyDefinedException); + } catch (final DuplicatedYangResourceException duplicatedYangResourceException) { + log.warn("Duplicate Yang Resource {} creation for {}. " + + "CM Handle will be LOCKED (for retry). Exception details: ", + duplicatedYangResourceException.getName(), + yangModelCmHandle.getId(), + duplicatedYangResourceException); + throw duplicatedYangResourceException; + } } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java index 40404b719a..b63496753e 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java @@ -114,7 +114,7 @@ public class ModuleSyncTasks { compositeState.setLockReason(null); return CmHandleState.READY; } catch (final Exception e) { - log.warn("Processing of {} failed,reason : {}.", yangModelCmHandle.getId(), e.getMessage()); + log.warn("Processing of {} failed,reason : ", yangModelCmHandle.getId(), e); final LockReasonCategory lockReasonCategory = inUpgrade ? LockReasonCategory.MODULE_UPGRADE_FAILED : LockReasonCategory.MODULE_SYNC_FAILED; diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncServiceSpec.groovy index 7881375762..868609e282 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncServiceSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncServiceSpec.groovy @@ -90,21 +90,45 @@ class ModuleSyncServiceSpec extends Specification { 'without' | '' } - def 'Attempt Sync models for a cm handle with existing schema set (#originalException).'() { + def 'Sync models for a cm handle with already defined exception upon schema set creation.'() { given: 'a cm handle to be synced' def yangModelCmHandle = createAdvisedCmHandle('existing tag') and: 'dmi returns no new yang resources' mockDmiModelOperations.getNewYangResourcesFromDmi(*_) >> [:] and: 'already defined exception occurs when creating schema (existing)' + mockCpsModuleService.createSchemaSetFromModules(*_) >> { throw AlreadyDefinedException.forSchemaSet('', '', null) } + when: 'module sync is triggered' + objectUnderTest.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle) + then: 'the exception is ignored' + noExceptionThrown() + } + + def 'Sync models for a cm handle with already defined exception upon anchor set creation.'() { + given: 'a cm handle to be synced' + def yangModelCmHandle = createAdvisedCmHandle('existing tag') + and: 'dmi returns no new yang resources' + mockDmiModelOperations.getNewYangResourcesFromDmi(*_) >> [:] + and: 'already defined exception occurs when creating schema (existing)' + mockCpsAnchorService.createAnchor(*_) >> { throw AlreadyDefinedException.forAnchor('', '', null) } + when: 'module sync is triggered' + objectUnderTest.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle) + then: 'the exception is ignored' + noExceptionThrown() + } + + def 'Attempt Sync models for a cm handle with duplicate yang resources exception).'() { + given: 'a cm handle to be synced' + def yangModelCmHandle = createAdvisedCmHandle('existing tag') + and: 'dmi returns no new yang resources' + mockDmiModelOperations.getNewYangResourcesFromDmi(*_) >> [:] + and: 'duplicate yang resource exception occurs when creating schema' + def originalException = new DuplicatedYangResourceException('', '', null) mockCpsModuleService.createSchemaSetFromModules(*_) >> { throw originalException } when: 'module sync is triggered' objectUnderTest.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle) then: 'same exception is thrown up' def thrownException = thrown(Exception) assert thrownException == originalException - where: 'following exceptions occur' - originalException << [AlreadyDefinedException.forSchemaSet('', '', null), - new DuplicatedYangResourceException('', '', null) ] } def 'Model upgrade without using Module Set Tags (legacy) where the modules are in database.'() { diff --git a/cps-rest/pom.xml b/cps-rest/pom.xml index b04daf03bd..f72cef9f28 100644 --- a/cps-rest/pom.xml +++ b/cps-rest/pom.xml @@ -62,10 +62,6 @@ <artifactId>spring-boot-starter-jetty</artifactId> </dependency> <dependency> - <groupId>org.springframework.retry</groupId> - <artifactId>spring-retry</artifactId> - </dependency> - <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aspects</artifactId> </dependency> diff --git a/cps-ri/pom.xml b/cps-ri/pom.xml index 2492cb837e..f065421ec9 100644 --- a/cps-ri/pom.xml +++ b/cps-ri/pom.xml @@ -56,10 +56,6 @@ <artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
- <groupId>org.springframework.retry</groupId>
- <artifactId>spring-retry</artifactId>
- </dependency>
- <dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
</dependency>
diff --git a/cps-ri/src/main/java/org/onap/cps/ri/CpsModulePersistenceServiceImpl.java b/cps-ri/src/main/java/org/onap/cps/ri/CpsModulePersistenceServiceImpl.java index 4f7492ff26..aaf6165471 100755 --- a/cps-ri/src/main/java/org/onap/cps/ri/CpsModulePersistenceServiceImpl.java +++ b/cps-ri/src/main/java/org/onap/cps/ri/CpsModulePersistenceServiceImpl.java @@ -70,8 +70,6 @@ import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource; import org.opendaylight.yangtools.yang.parser.api.YangSyntaxErrorException; import org.opendaylight.yangtools.yang.parser.rfc7950.repo.YangModelDependencyInfo; import org.springframework.dao.DataIntegrityViolationException; -import org.springframework.retry.RetryContext; -import org.springframework.retry.support.RetrySynchronizationManager; import org.springframework.stereotype.Component; @Slf4j @@ -269,18 +267,10 @@ public class CpsModulePersistenceServiceImpl implements CpsModulePersistenceServ yangResourceRepository.saveAll(newYangResourceEntities); } catch (final DataIntegrityViolationException dataIntegrityViolationException) { // Throw a CPS duplicated Yang resource exception if the cause of the error is a yang checksum - // database constraint violation. - // If it is not, then throw the original exception + // database constraint violation. If it is not, then throw the original exception final Optional<DuplicatedYangResourceException> convertedException = convertToDuplicatedYangResourceException( dataIntegrityViolationException, newYangResourceEntities); - convertedException.ifPresent( - e -> { - final RetryContext retryContext = RetrySynchronizationManager.getContext(); - int retryCount = retryContext == null ? 0 : retryContext.getRetryCount(); - log.warn("Cannot persist duplicated yang resource. System will attempt this method " - + "up to 5 times. Current retry count : {}", ++retryCount, e); - }); throw convertedException.isPresent() ? convertedException.get() : dataIntegrityViolationException; } } diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy index 9cb8c29e21..81fbc3c4f7 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy @@ -69,7 +69,7 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase { deregisterSequenceOfCmHandles(DMI1_URL, PARALLEL_SYNC_SAMPLE_SIZE, 1) } - /** this test has intermittent failures, due to timeouts. + /** this test has intermittent failures, due to race conditions * Ignored but left here as it might be valuable to further optimization investigations. **/ @Ignore @@ -79,7 +79,6 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase { def cmHandlesPerTag = 250 def totalCmHandles = numberOfTags * cmHandlesPerTag def offset = 1 - def minimumBatches = totalCmHandles / 100 registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'cps-2478-A', cmHandlesPerTag, offset) and: 'register anther 250 cm handles with module set tag cps-2478-B' offset += cmHandlesPerTag @@ -89,23 +88,21 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase { when: 'sync all advised cm handles' objectUnderTest.moduleSyncAdvisedCmHandles() Thread.sleep(100) - then: 'retry until both schema sets are stored in db (1 schema set for each module set tag)' - def dbSchemaSetStorageTimer = meterRegistry.get('cps.module.persistence.schemaset.store').timer() - new PollingConditions().within(10, () -> { - objectUnderTest.moduleSyncAdvisedCmHandles() - Thread.sleep(100) - assert dbSchemaSetStorageTimer.count() == 2 - }) - then: 'wait till at least 5 batches of state updates are done (often more because of retries of locked cm handles)' - def dbStateUpdateTimer = meterRegistry.get('cps.ncmp.cmhandle.state.update.batch').timer() + then: 'Keep processing until there are no more LOCKED or ADVISED cm handles' new PollingConditions().within(10, () -> { - assert dbStateUpdateTimer.count() >= minimumBatches + def advised = cmHandlesByState.get('advisedCmHandlesCount') + def locked = cmHandlesByState.get('lockedCmHandlesCount') + if ( locked > 0 | advised > 0 ) { + println "CPS-2576 Need to retry ${locked} LOCKED / ${advised} ADVISED cm Handles" + objectUnderTest.moduleSyncAdvisedCmHandles() + Thread.sleep(100) + } + assert cmHandlesByState.get('lockedCmHandlesCount') + cmHandlesByState.get('advisedCmHandlesCount') == 0 }) - and: 'one call to DMI per module set tag to get module references (may be more due to parallel processing of batches)' - def dmiModuleRetrievalTimer = meterRegistry.get('cps.ncmp.inventory.module.references.from.dmi').timer() - assert dmiModuleRetrievalTimer.count() >= numberOfTags && dmiModuleRetrievalTimer.count() <= minimumBatches - and: 'log the relevant instrumentation' + def dmiModuleRetrievalTimer = meterRegistry.get('cps.ncmp.inventory.module.references.from.dmi').timer() + def dbSchemaSetStorageTimer = meterRegistry.get('cps.module.persistence.schemaset.store').timer() + def dbStateUpdateTimer = meterRegistry.get('cps.ncmp.cmhandle.state.update.batch').timer() logInstrumentation(dmiModuleRetrievalTimer, 'get modules from DMI ') logInstrumentation(dbSchemaSetStorageTimer, 'store schema sets ') logInstrumentation(dbStateUpdateTimer, 'batch state updates ') @@ -134,6 +131,10 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase { deregisterSequenceOfCmHandles(DMI1_URL, PARALLEL_SYNC_SAMPLE_SIZE, 1) } + /** this test has intermittent failures, due to race conditions + * Ignored but left here as it might be valuable to further optimization investigations. + **/ + @Ignore def 'Schema sets with overlapping modules processed at the same time (DB constraint violation).'() { given: 'register one batch (100) cm handles of tag A (with overlapping module names)' registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'tagA', 100, 1, ModuleNameStrategy.OVERLAPPING) |