diff options
Diffstat (limited to 'cps-ri')
4 files changed, 144 insertions, 54 deletions
diff --git a/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java b/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java index 61e1d5b569..c13422dc4d 100644 --- a/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java +++ b/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java @@ -52,6 +52,7 @@ import org.onap.cps.spi.entities.FragmentEntity; import org.onap.cps.spi.entities.SchemaSetEntity; import org.onap.cps.spi.entities.YangResourceEntity; import org.onap.cps.spi.exceptions.AlreadyDefinedException; +import org.onap.cps.spi.exceptions.AlreadyDefinedExceptionBatch; import org.onap.cps.spi.exceptions.ConcurrencyException; import org.onap.cps.spi.exceptions.CpsAdminException; import org.onap.cps.spi.exceptions.CpsPathException; @@ -88,48 +89,82 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService Pattern.compile("\\[(\\@([^\\/]{0,9999}))\\]$"); @Override - @Transactional public void addChildDataNode(final String dataspaceName, final String anchorName, final String parentNodeXpath, final DataNode newChildDataNode) { - addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, Collections.singleton(newChildDataNode)); + addNewChildDataNode(dataspaceName, anchorName, parentNodeXpath, newChildDataNode); } @Override - @Transactional public void addListElements(final String dataspaceName, final String anchorName, final String parentNodeXpath, final Collection<DataNode> newListElements) { - addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, newListElements); + addChildrenDataNodes(dataspaceName, anchorName, parentNodeXpath, newListElements); } @Override - @Transactional - public void addListElementsBatch(final String dataspaceName, final String anchorName, final String parentNodeXpath, - final Collection<Collection<DataNode>> newListsElements) { + public void addMultipleLists(final String dataspaceName, final String anchorName, final String parentNodeXpath, + final Collection<Collection<DataNode>> newLists) { + final Collection<String> failedXpaths = new HashSet<>(); + newLists.forEach(newList -> { + try { + addChildrenDataNodes(dataspaceName, anchorName, parentNodeXpath, newList); + } catch (final AlreadyDefinedExceptionBatch e) { + failedXpaths.addAll(e.getAlreadyDefinedXpaths()); + } + }); - newListsElements.forEach( - newListElement -> addListElements(dataspaceName, anchorName, parentNodeXpath, newListElement)); + if (!failedXpaths.isEmpty()) { + throw new AlreadyDefinedExceptionBatch(failedXpaths); + } } - private void addChildDataNodes(final String dataspaceName, final String anchorName, final String parentNodeXpath, - final Collection<DataNode> newChildren) { + private void addNewChildDataNode(final String dataspaceName, final String anchorName, + final String parentNodeXpath, final DataNode newChild) { final FragmentEntity parentFragmentEntity = getFragmentByXpath(dataspaceName, anchorName, parentNodeXpath); + final FragmentEntity newChildAsFragmentEntity = + convertToFragmentWithAllDescendants(parentFragmentEntity.getDataspace(), + parentFragmentEntity.getAnchor(), newChild); + newChildAsFragmentEntity.setParentId(parentFragmentEntity.getId()); + try { + fragmentRepository.save(newChildAsFragmentEntity); + } catch (final DataIntegrityViolationException e) { + throw AlreadyDefinedException.forDataNode(newChild.getXpath(), anchorName, e); + } + + } + + private void addChildrenDataNodes(final String dataspaceName, final String anchorName, final String parentNodeXpath, + final Collection<DataNode> newChildren) { + final FragmentEntity parentFragmentEntity = getFragmentByXpath(dataspaceName, anchorName, parentNodeXpath); + final List<FragmentEntity> fragmentEntities = new ArrayList<>(newChildren.size()); try { - final List<FragmentEntity> fragmentEntities = new ArrayList<>(); newChildren.forEach(newChildAsDataNode -> { - final FragmentEntity newChildAsFragmentEntity = convertToFragmentWithAllDescendants( - parentFragmentEntity.getDataspace(), - parentFragmentEntity.getAnchor(), - newChildAsDataNode); + final FragmentEntity newChildAsFragmentEntity = + convertToFragmentWithAllDescendants(parentFragmentEntity.getDataspace(), + parentFragmentEntity.getAnchor(), newChildAsDataNode); newChildAsFragmentEntity.setParentId(parentFragmentEntity.getId()); fragmentEntities.add(newChildAsFragmentEntity); }); fragmentRepository.saveAll(fragmentEntities); - } catch (final DataIntegrityViolationException exception) { - final List<String> conflictXpaths = newChildren.stream() - .map(DataNode::getXpath) - .collect(Collectors.toList()); - throw AlreadyDefinedException.forDataNodes(conflictXpaths, anchorName, exception); + } catch (final DataIntegrityViolationException e) { + log.warn("Exception occurred : {} , While saving : {} children, retrying using individual save operations", + e, fragmentEntities.size()); + retrySavingEachChildIndividually(dataspaceName, anchorName, parentNodeXpath, newChildren); + } + } + + private void retrySavingEachChildIndividually(final String dataspaceName, final String anchorName, + final String parentNodeXpath, final Collection<DataNode> newChildren) { + final Collection<String> failedXpaths = new HashSet<>(); + for (final DataNode newChild : newChildren) { + try { + addNewChildDataNode(dataspaceName, anchorName, parentNodeXpath, newChild); + } catch (final AlreadyDefinedException e) { + failedXpaths.add(newChild.getXpath()); + } + } + if (!failedXpaths.isEmpty()) { + throw new AlreadyDefinedExceptionBatch(failedXpaths); } } @@ -199,8 +234,8 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService } catch (final PathParsingException e) { throw new CpsPathException(e.getMessage()); } - return fragmentRepository.getByDataspaceAndAnchorAndXpath(dataspaceEntity, anchorEntity, - normalizedXpath); + + return fragmentRepository.getByDataspaceAndAnchorAndXpath(dataspaceEntity, anchorEntity, normalizedXpath); } } @@ -310,8 +345,7 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService } catch (final StaleStateException staleStateException) { throw new ConcurrencyException("Concurrent Transactions", String.format("dataspace :'%s', Anchor : '%s' and xpath: '%s' is updated by another transaction.", - dataspaceName, anchorName, dataNode.getXpath()), - staleStateException); + dataspaceName, anchorName, dataNode.getXpath())); } } @@ -319,6 +353,7 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName, final List<DataNode> dataNodes) { + final Map<DataNode, FragmentEntity> dataNodeFragmentEntityMap = dataNodes.stream() .collect(Collectors.toMap( dataNode -> dataNode, dataNode -> getFragmentByXpath(dataspaceName, anchorName, dataNode.getXpath()))); @@ -327,10 +362,27 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService try { fragmentRepository.saveAll(dataNodeFragmentEntityMap.values()); } catch (final StaleStateException staleStateException) { - throw new ConcurrencyException("Concurrent Transactions", - String.format("A data node in dataspace :'%s' with Anchor : '%s' is updated by another transaction.", - dataspaceName, anchorName), - staleStateException); + retryUpdateDataNodesIndividually(dataspaceName, anchorName, dataNodeFragmentEntityMap.values()); + } + } + + private void retryUpdateDataNodesIndividually(final String dataspaceName, final String anchorName, + final Collection<FragmentEntity> fragmentEntities) { + final Collection<String> failedXpaths = new HashSet<>(); + + fragmentEntities.forEach(dataNodeFragment -> { + try { + fragmentRepository.save(dataNodeFragment); + } catch (final StaleStateException e) { + failedXpaths.add(dataNodeFragment.getXpath()); + } + }); + + if (!failedXpaths.isEmpty()) { + final String failedXpathsConcatenated = String.join(",", failedXpaths); + throw new ConcurrencyException("Concurrent Transactions", String.format( + "DataNodes : %s in Dataspace :'%s' with Anchor : '%s' are updated by another transaction.", + failedXpathsConcatenated, dataspaceName, anchorName)); } } diff --git a/cps-ri/src/main/java/org/onap/cps/spi/repository/FragmentRepositoryCpsPathQueryImpl.java b/cps-ri/src/main/java/org/onap/cps/spi/repository/FragmentRepositoryCpsPathQueryImpl.java index 4489cddd30..654c1c0854 100644 --- a/cps-ri/src/main/java/org/onap/cps/spi/repository/FragmentRepositoryCpsPathQueryImpl.java +++ b/cps-ri/src/main/java/org/onap/cps/spi/repository/FragmentRepositoryCpsPathQueryImpl.java @@ -20,7 +20,6 @@ package org.onap.cps.spi.repository; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -29,12 +28,14 @@ import javax.persistence.PersistenceContext; import javax.persistence.Query; import javax.transaction.Transactional; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.onap.cps.cpspath.parser.CpsPathPrefixType; import org.onap.cps.cpspath.parser.CpsPathQuery; import org.onap.cps.spi.entities.FragmentEntity; import org.onap.cps.utils.JsonObjectMapper; @RequiredArgsConstructor +@Slf4j public class FragmentRepositoryCpsPathQueryImpl implements FragmentRepositoryCpsPathQuery { public static final String REGEX_ABSOLUTE_PATH_PREFIX = ".*\\/"; @@ -62,16 +63,8 @@ public class FragmentRepositoryCpsPathQueryImpl implements FragmentRepositoryCps addTextFunctionCondition(cpsPathQuery, sqlStringBuilder, queryParameters); final Query query = entityManager.createNativeQuery(sqlStringBuilder.toString(), FragmentEntity.class); setQueryParameters(query, queryParameters); - return getFragmentEntitiesAsStream(query); - } - - private List<FragmentEntity> getFragmentEntitiesAsStream(final Query query) { - final List<FragmentEntity> fragmentEntities = new ArrayList<>(); - query.getResultStream().forEach(fragmentEntity -> { - fragmentEntities.add((FragmentEntity) fragmentEntity); - entityManager.detach(fragmentEntity); - }); - + final List<FragmentEntity> fragmentEntities = query.getResultList(); + log.debug("Fetched {} fragment entities by anchor and cps path.", fragmentEntities.size()); return fragmentEntities; } diff --git a/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceIntegrationSpec.groovy b/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceIntegrationSpec.groovy index acc243b5b4..5e15ca795f 100755 --- a/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceIntegrationSpec.groovy +++ b/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceIntegrationSpec.groovy @@ -27,6 +27,7 @@ import org.onap.cps.cpspath.parser.PathParsingException import org.onap.cps.spi.CpsDataPersistenceService import org.onap.cps.spi.entities.FragmentEntity import org.onap.cps.spi.exceptions.AlreadyDefinedException +import org.onap.cps.spi.exceptions.AlreadyDefinedExceptionBatch import org.onap.cps.spi.exceptions.AnchorNotFoundException import org.onap.cps.spi.exceptions.CpsAdminException import org.onap.cps.spi.exceptions.CpsPathException @@ -48,6 +49,7 @@ class CpsDataPersistenceServiceIntegrationSpec extends CpsPersistenceSpecBase { CpsDataPersistenceService objectUnderTest static final JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper()) + static final DataNodeBuilder dataNodeBuilder = new DataNodeBuilder() static final String SET_DATA = '/data/fragment.sql' static final int DATASPACE_1001_ID = 1001L @@ -165,7 +167,7 @@ class CpsDataPersistenceServiceIntegrationSpec extends CpsPersistenceSpecBase { def grandChild = buildDataNode('/parent-201/child-204[@key="NEW1"]/grand-child-204[@key2="NEW1-CHILD"]', [leave:'value'], []) listElements[0].childDataNodes = [grandChild] when: 'the new data node (list elements) are added to an existing parent node' - objectUnderTest.addListElementsBatch(DATASPACE_NAME, ANCHOR_NAME3, '/parent-201', [listElements]) + objectUnderTest.addMultipleLists(DATASPACE_NAME, ANCHOR_NAME3, '/parent-201', [listElements]) then: 'new entries are successfully persisted, parent node now contains 5 children (2 new + 3 existing before)' def parentFragment = fragmentRepository.getById(LIST_DATA_NODE_PARENT201_FRAGMENT_ID) def allChildXpaths = parentFragment.childFragments.collect { it.xpath } @@ -179,17 +181,41 @@ class CpsDataPersistenceServiceIntegrationSpec extends CpsPersistenceSpecBase { } @Sql([CLEAR_DATA, SET_DATA]) + def 'Add multiple list with a mix of existing and new elements'() { + given: 'two new child list elements for an existing parent' + def existingDataNode = dataNodeBuilder.withXpath('/parent-100/child-001').withLeaves(['id': '001']).build() + def newDataNode1 = dataNodeBuilder.withXpath('/parent-100/child-new1').withLeaves(['id': 'new1']).build() + def newDataNode2 = dataNodeBuilder.withXpath('/parent-200/child-new2').withLeaves(['id': 'new2']).build() + def dataNodeList1 = [existingDataNode, newDataNode1] + def dataNodeList2 = [newDataNode2] + when: 'duplicate data node is requested to be added' + objectUnderTest.addMultipleLists(DATASPACE_NAME, ANCHOR_NAME3, '/', [dataNodeList1,dataNodeList2]) + then: 'already defined batch exception is thrown' + def thrown = thrown(AlreadyDefinedExceptionBatch) + and: 'it only contains the xpath(s) of the duplicated elements' + assert thrown.alreadyDefinedXpaths.size() == 1 + assert thrown.alreadyDefinedXpaths.contains('/parent-100/child-001') + and: 'it does NOT contains the xpaths of the new element that were not combined with existing elements' + assert !thrown.alreadyDefinedXpaths.contains('/parent-100/child-new1') + assert !thrown.alreadyDefinedXpaths.contains('/parent-100/child-new1') + and: 'the new entity is inserted correctly' + def dataspaceEntity = dataspaceRepository.getByName(DATASPACE_NAME) + def anchorEntity = anchorRepository.getByDataspaceAndName(dataspaceEntity, ANCHOR_NAME3) + fragmentRepository.findByDataspaceAndAnchorAndXpath(dataspaceEntity, anchorEntity, '/parent-200/child-new2').isPresent() + } + + @Sql([CLEAR_DATA, SET_DATA]) def 'Add list element error scenario: #scenario.'() { given: 'list element as a collection of data nodes' - def listElementCollection = toDataNodes(listElementXpaths) + def listElements = toDataNodes(listElementXpaths) when: 'attempt to add list elements to parent node' - objectUnderTest.addListElements(DATASPACE_NAME, ANCHOR_NAME3, parentNodeXpath, listElementCollection) + objectUnderTest.addListElements(DATASPACE_NAME, ANCHOR_NAME3, parentNodeXpath, listElements) then: 'a #expectedException is thrown' thrown(expectedException) where: 'following parameters were used' scenario | parentNodeXpath | listElementXpaths || expectedException 'parent node does not exist' | '/unknown' | ['irrelevant'] || DataNodeNotFoundException - 'data fragment already exists' | '/parent-201' | ["/parent-201/child-204[@key='A']"] || AlreadyDefinedException + 'data fragment already exists' | '/parent-201' | ["/parent-201/child-204[@key='A']"] || AlreadyDefinedExceptionBatch } @Sql([CLEAR_DATA, SET_DATA]) @@ -559,8 +585,9 @@ class CpsDataPersistenceServiceIntegrationSpec extends CpsPersistenceSpecBase { return xpaths.collect { new DataNodeBuilder().withXpath(it).build() } } + static DataNode buildDataNode(xpath, leaves, childDataNodes) { - return new DataNodeBuilder().withXpath(xpath).withLeaves(leaves).withChildDataNodes(childDataNodes).build() + return dataNodeBuilder.withXpath(xpath).withLeaves(leaves).withChildDataNodes(childDataNodes).build() } static Map<String, Object> getLeavesMap(FragmentEntity fragmentEntity) { diff --git a/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy b/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy index 1bbf358e54..470b03afdf 100644 --- a/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy +++ b/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy @@ -86,17 +86,25 @@ class CpsDataPersistenceServiceSpec extends Specification { assert concurrencyException.getDetails().contains('/some/xpath') } - def 'Handling of StaleStateException (caused by concurrent updates) during update data nodes and descendants.'() { - given: 'the fragment repository returns a list of fragment entities' - mockFragmentRepository.getByDataspaceAndAnchorAndXpath(*_) >> new FragmentEntity() - and: 'a data node is concurrently updated by another transaction' + def 'Handling of StaleStateException (caused by concurrent updates) during update data nodes and descendants.'() { + given: 'the system contains and can update one datanode' + def dataNode1 = mockDataNodeAndFragmentEntity('/node1', 'OK') + and: 'the system contains two more datanodes that throw an exception while updating' + def dataNode2 = mockDataNodeAndFragmentEntity('/node2', 'EXCEPTION') + def dataNode3 = mockDataNodeAndFragmentEntity('/node3', 'EXCEPTION') + and: 'the batch update will therefore also fail' mockFragmentRepository.saveAll(*_) >> { throw new StaleStateException("concurrent updates") } - when: 'attempt to update data node with submitted data nodes' - objectUnderTest.updateDataNodesAndDescendants('some-dataspace', 'some-anchor', []) + when: 'attempt batch update data nodes' + objectUnderTest.updateDataNodesAndDescendants('some-dataspace', 'some-anchor', [dataNode1, dataNode2, dataNode3]) then: 'concurrency exception is thrown' - def concurrencyException = thrown(ConcurrencyException) - assert concurrencyException.getDetails().contains('some-dataspace') - assert concurrencyException.getDetails().contains('some-anchor') + def thrown = thrown(ConcurrencyException) + assert thrown.message == 'Concurrent Transactions' + and: 'it does not contain the successfull datanode' + assert !thrown.details.contains('/node1') + and: 'it contains the failed datanodes' + assert thrown.details.contains('/node2') + assert thrown.details.contains('/node3') + } def 'Retrieving a data node with a property JSON value of #scenario'() { @@ -193,4 +201,14 @@ class CpsDataPersistenceServiceSpec extends Specification { assert fragmentEntities.size() == 2 }}) } + + def mockDataNodeAndFragmentEntity(xpath, scenario) { + def dataNode = new DataNodeBuilder().withXpath(xpath).build() + def fragmentEntity = new FragmentEntity(xpath: xpath, childFragments: []) + mockFragmentRepository.getByDataspaceAndAnchorAndXpath(_, _, xpath) >> fragmentEntity + if ('EXCEPTION' == scenario) { + mockFragmentRepository.save(fragmentEntity) >> { throw new StaleStateException("concurrent updates") } + } + return dataNode + } }
\ No newline at end of file |