From 9697e76c319e4cf59fc494216a720393545503a9 Mon Sep 17 00:00:00 2001 From: mpriyank Date: Tue, 13 Sep 2022 19:00:59 +0100 Subject: handle partial failure on batch state update - catching of failures on retry of individual nodes on batch update - test scenarios for the same Issue-ID: CPS-1232 Issue-ID: CPS-1126 Change-Id: I9dc13e7bbe44673f8ac14fbde08a85d6a5142487 Signed-off-by: mpriyank --- .../spi/impl/CpsDataPersistenceServiceImpl.java | 33 +++++++++++++++----- .../spi/impl/CpsDataPersistenceServiceSpec.groovy | 36 ++++++++++++++++------ .../cps/spi/exceptions/ConcurrencyException.java | 4 +-- 3 files changed, 54 insertions(+), 19 deletions(-) diff --git a/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java b/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java index d62421c5af..c13422dc4d 100644 --- a/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java +++ b/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java @@ -234,8 +234,8 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService } catch (final PathParsingException e) { throw new CpsPathException(e.getMessage()); } - return fragmentRepository.getByDataspaceAndAnchorAndXpath(dataspaceEntity, anchorEntity, - normalizedXpath); + + return fragmentRepository.getByDataspaceAndAnchorAndXpath(dataspaceEntity, anchorEntity, normalizedXpath); } } @@ -345,8 +345,7 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService } catch (final StaleStateException staleStateException) { throw new ConcurrencyException("Concurrent Transactions", String.format("dataspace :'%s', Anchor : '%s' and xpath: '%s' is updated by another transaction.", - dataspaceName, anchorName, dataNode.getXpath()), - staleStateException); + dataspaceName, anchorName, dataNode.getXpath())); } } @@ -354,6 +353,7 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName, final List dataNodes) { + final Map dataNodeFragmentEntityMap = dataNodes.stream() .collect(Collectors.toMap( dataNode -> dataNode, dataNode -> getFragmentByXpath(dataspaceName, anchorName, dataNode.getXpath()))); @@ -362,10 +362,27 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService try { fragmentRepository.saveAll(dataNodeFragmentEntityMap.values()); } catch (final StaleStateException staleStateException) { - throw new ConcurrencyException("Concurrent Transactions", - String.format("A data node in dataspace :'%s' with Anchor : '%s' is updated by another transaction.", - dataspaceName, anchorName), - staleStateException); + retryUpdateDataNodesIndividually(dataspaceName, anchorName, dataNodeFragmentEntityMap.values()); + } + } + + private void retryUpdateDataNodesIndividually(final String dataspaceName, final String anchorName, + final Collection fragmentEntities) { + final Collection failedXpaths = new HashSet<>(); + + fragmentEntities.forEach(dataNodeFragment -> { + try { + fragmentRepository.save(dataNodeFragment); + } catch (final StaleStateException e) { + failedXpaths.add(dataNodeFragment.getXpath()); + } + }); + + if (!failedXpaths.isEmpty()) { + final String failedXpathsConcatenated = String.join(",", failedXpaths); + throw new ConcurrencyException("Concurrent Transactions", String.format( + "DataNodes : %s in Dataspace :'%s' with Anchor : '%s' are updated by another transaction.", + failedXpathsConcatenated, dataspaceName, anchorName)); } } diff --git a/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy b/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy index 1bbf358e54..470b03afdf 100644 --- a/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy +++ b/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy @@ -86,17 +86,25 @@ class CpsDataPersistenceServiceSpec extends Specification { assert concurrencyException.getDetails().contains('/some/xpath') } - def 'Handling of StaleStateException (caused by concurrent updates) during update data nodes and descendants.'() { - given: 'the fragment repository returns a list of fragment entities' - mockFragmentRepository.getByDataspaceAndAnchorAndXpath(*_) >> new FragmentEntity() - and: 'a data node is concurrently updated by another transaction' + def 'Handling of StaleStateException (caused by concurrent updates) during update data nodes and descendants.'() { + given: 'the system contains and can update one datanode' + def dataNode1 = mockDataNodeAndFragmentEntity('/node1', 'OK') + and: 'the system contains two more datanodes that throw an exception while updating' + def dataNode2 = mockDataNodeAndFragmentEntity('/node2', 'EXCEPTION') + def dataNode3 = mockDataNodeAndFragmentEntity('/node3', 'EXCEPTION') + and: 'the batch update will therefore also fail' mockFragmentRepository.saveAll(*_) >> { throw new StaleStateException("concurrent updates") } - when: 'attempt to update data node with submitted data nodes' - objectUnderTest.updateDataNodesAndDescendants('some-dataspace', 'some-anchor', []) + when: 'attempt batch update data nodes' + objectUnderTest.updateDataNodesAndDescendants('some-dataspace', 'some-anchor', [dataNode1, dataNode2, dataNode3]) then: 'concurrency exception is thrown' - def concurrencyException = thrown(ConcurrencyException) - assert concurrencyException.getDetails().contains('some-dataspace') - assert concurrencyException.getDetails().contains('some-anchor') + def thrown = thrown(ConcurrencyException) + assert thrown.message == 'Concurrent Transactions' + and: 'it does not contain the successfull datanode' + assert !thrown.details.contains('/node1') + and: 'it contains the failed datanodes' + assert thrown.details.contains('/node2') + assert thrown.details.contains('/node3') + } def 'Retrieving a data node with a property JSON value of #scenario'() { @@ -193,4 +201,14 @@ class CpsDataPersistenceServiceSpec extends Specification { assert fragmentEntities.size() == 2 }}) } + + def mockDataNodeAndFragmentEntity(xpath, scenario) { + def dataNode = new DataNodeBuilder().withXpath(xpath).build() + def fragmentEntity = new FragmentEntity(xpath: xpath, childFragments: []) + mockFragmentRepository.getByDataspaceAndAnchorAndXpath(_, _, xpath) >> fragmentEntity + if ('EXCEPTION' == scenario) { + mockFragmentRepository.save(fragmentEntity) >> { throw new StaleStateException("concurrent updates") } + } + return dataNode + } } \ No newline at end of file diff --git a/cps-service/src/main/java/org/onap/cps/spi/exceptions/ConcurrencyException.java b/cps-service/src/main/java/org/onap/cps/spi/exceptions/ConcurrencyException.java index 3a8a94b979..b5eae93b32 100644 --- a/cps-service/src/main/java/org/onap/cps/spi/exceptions/ConcurrencyException.java +++ b/cps-service/src/main/java/org/onap/cps/spi/exceptions/ConcurrencyException.java @@ -20,8 +20,8 @@ package org.onap.cps.spi.exceptions; public class ConcurrencyException extends CpsException { - public ConcurrencyException(final String message, final String details, final Throwable cause) { - super(message, details, cause); + public ConcurrencyException(final String message, final String details) { + super(message, details); } } -- cgit 1.2.3-korg