aboutsummaryrefslogtreecommitdiffstats
path: root/cps-ri/src
diff options
context:
space:
mode:
authormpriyank <priyank.maheshwari@est.tech>2022-09-13 19:00:59 +0100
committermpriyank <priyank.maheshwari@est.tech>2022-09-13 21:42:14 +0100
commit9697e76c319e4cf59fc494216a720393545503a9 (patch)
tree5ccc5b0be1f3b9ee787a222af6cd1d21824598c5 /cps-ri/src
parent86b5cee2920672726bd66df0775198f57f29b8cc (diff)
handle partial failure on batch state update
- catching of failures on retry of individual nodes on batch update - test scenarios for the same Issue-ID: CPS-1232 Issue-ID: CPS-1126 Change-Id: I9dc13e7bbe44673f8ac14fbde08a85d6a5142487 Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
Diffstat (limited to 'cps-ri/src')
-rw-r--r--cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java33
-rw-r--r--cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy36
2 files changed, 52 insertions, 17 deletions
diff --git a/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java b/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java
index d62421c5af..c13422dc4d 100644
--- a/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java
+++ b/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java
@@ -234,8 +234,8 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
} catch (final PathParsingException e) {
throw new CpsPathException(e.getMessage());
}
- return fragmentRepository.getByDataspaceAndAnchorAndXpath(dataspaceEntity, anchorEntity,
- normalizedXpath);
+
+ return fragmentRepository.getByDataspaceAndAnchorAndXpath(dataspaceEntity, anchorEntity, normalizedXpath);
}
}
@@ -345,8 +345,7 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
} catch (final StaleStateException staleStateException) {
throw new ConcurrencyException("Concurrent Transactions",
String.format("dataspace :'%s', Anchor : '%s' and xpath: '%s' is updated by another transaction.",
- dataspaceName, anchorName, dataNode.getXpath()),
- staleStateException);
+ dataspaceName, anchorName, dataNode.getXpath()));
}
}
@@ -354,6 +353,7 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
public void updateDataNodesAndDescendants(final String dataspaceName,
final String anchorName,
final List<DataNode> dataNodes) {
+
final Map<DataNode, FragmentEntity> dataNodeFragmentEntityMap = dataNodes.stream()
.collect(Collectors.toMap(
dataNode -> dataNode, dataNode -> getFragmentByXpath(dataspaceName, anchorName, dataNode.getXpath())));
@@ -362,10 +362,27 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
try {
fragmentRepository.saveAll(dataNodeFragmentEntityMap.values());
} catch (final StaleStateException staleStateException) {
- throw new ConcurrencyException("Concurrent Transactions",
- String.format("A data node in dataspace :'%s' with Anchor : '%s' is updated by another transaction.",
- dataspaceName, anchorName),
- staleStateException);
+ retryUpdateDataNodesIndividually(dataspaceName, anchorName, dataNodeFragmentEntityMap.values());
+ }
+ }
+
+ private void retryUpdateDataNodesIndividually(final String dataspaceName, final String anchorName,
+ final Collection<FragmentEntity> fragmentEntities) {
+ final Collection<String> failedXpaths = new HashSet<>();
+
+ fragmentEntities.forEach(dataNodeFragment -> {
+ try {
+ fragmentRepository.save(dataNodeFragment);
+ } catch (final StaleStateException e) {
+ failedXpaths.add(dataNodeFragment.getXpath());
+ }
+ });
+
+ if (!failedXpaths.isEmpty()) {
+ final String failedXpathsConcatenated = String.join(",", failedXpaths);
+ throw new ConcurrencyException("Concurrent Transactions", String.format(
+ "DataNodes : %s in Dataspace :'%s' with Anchor : '%s' are updated by another transaction.",
+ failedXpathsConcatenated, dataspaceName, anchorName));
}
}
diff --git a/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy b/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy
index 1bbf358e54..470b03afdf 100644
--- a/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy
+++ b/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy
@@ -86,17 +86,25 @@ class CpsDataPersistenceServiceSpec extends Specification {
assert concurrencyException.getDetails().contains('/some/xpath')
}
- def 'Handling of StaleStateException (caused by concurrent updates) during update data nodes and descendants.'() {
- given: 'the fragment repository returns a list of fragment entities'
- mockFragmentRepository.getByDataspaceAndAnchorAndXpath(*_) >> new FragmentEntity()
- and: 'a data node is concurrently updated by another transaction'
+ def 'Handling of StaleStateException (caused by concurrent updates) during update data nodes and descendants.'() {
+ given: 'the system contains and can update one datanode'
+ def dataNode1 = mockDataNodeAndFragmentEntity('/node1', 'OK')
+ and: 'the system contains two more datanodes that throw an exception while updating'
+ def dataNode2 = mockDataNodeAndFragmentEntity('/node2', 'EXCEPTION')
+ def dataNode3 = mockDataNodeAndFragmentEntity('/node3', 'EXCEPTION')
+ and: 'the batch update will therefore also fail'
mockFragmentRepository.saveAll(*_) >> { throw new StaleStateException("concurrent updates") }
- when: 'attempt to update data node with submitted data nodes'
- objectUnderTest.updateDataNodesAndDescendants('some-dataspace', 'some-anchor', [])
+ when: 'attempt batch update data nodes'
+ objectUnderTest.updateDataNodesAndDescendants('some-dataspace', 'some-anchor', [dataNode1, dataNode2, dataNode3])
then: 'concurrency exception is thrown'
- def concurrencyException = thrown(ConcurrencyException)
- assert concurrencyException.getDetails().contains('some-dataspace')
- assert concurrencyException.getDetails().contains('some-anchor')
+ def thrown = thrown(ConcurrencyException)
+ assert thrown.message == 'Concurrent Transactions'
+ and: 'it does not contain the successfull datanode'
+ assert !thrown.details.contains('/node1')
+ and: 'it contains the failed datanodes'
+ assert thrown.details.contains('/node2')
+ assert thrown.details.contains('/node3')
+
}
def 'Retrieving a data node with a property JSON value of #scenario'() {
@@ -193,4 +201,14 @@ class CpsDataPersistenceServiceSpec extends Specification {
assert fragmentEntities.size() == 2
}})
}
+
+ def mockDataNodeAndFragmentEntity(xpath, scenario) {
+ def dataNode = new DataNodeBuilder().withXpath(xpath).build()
+ def fragmentEntity = new FragmentEntity(xpath: xpath, childFragments: [])
+ mockFragmentRepository.getByDataspaceAndAnchorAndXpath(_, _, xpath) >> fragmentEntity
+ if ('EXCEPTION' == scenario) {
+ mockFragmentRepository.save(fragmentEntity) >> { throw new StaleStateException("concurrent updates") }
+ }
+ return dataNode
+ }
} \ No newline at end of file