summaryrefslogtreecommitdiffstats
path: root/cps-ri
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ri')
-rw-r--r--cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java110
-rw-r--r--cps-ri/src/main/java/org/onap/cps/spi/repository/FragmentRepositoryCpsPathQueryImpl.java15
-rwxr-xr-xcps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceIntegrationSpec.groovy37
-rw-r--r--cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy36
4 files changed, 144 insertions, 54 deletions
diff --git a/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java b/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java
index 61e1d5b56..c13422dc4 100644
--- a/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java
+++ b/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java
@@ -52,6 +52,7 @@ import org.onap.cps.spi.entities.FragmentEntity;
import org.onap.cps.spi.entities.SchemaSetEntity;
import org.onap.cps.spi.entities.YangResourceEntity;
import org.onap.cps.spi.exceptions.AlreadyDefinedException;
+import org.onap.cps.spi.exceptions.AlreadyDefinedExceptionBatch;
import org.onap.cps.spi.exceptions.ConcurrencyException;
import org.onap.cps.spi.exceptions.CpsAdminException;
import org.onap.cps.spi.exceptions.CpsPathException;
@@ -88,48 +89,82 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
Pattern.compile("\\[(\\@([^\\/]{0,9999}))\\]$");
@Override
- @Transactional
public void addChildDataNode(final String dataspaceName, final String anchorName, final String parentNodeXpath,
final DataNode newChildDataNode) {
- addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, Collections.singleton(newChildDataNode));
+ addNewChildDataNode(dataspaceName, anchorName, parentNodeXpath, newChildDataNode);
}
@Override
- @Transactional
public void addListElements(final String dataspaceName, final String anchorName, final String parentNodeXpath,
final Collection<DataNode> newListElements) {
- addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, newListElements);
+ addChildrenDataNodes(dataspaceName, anchorName, parentNodeXpath, newListElements);
}
@Override
- @Transactional
- public void addListElementsBatch(final String dataspaceName, final String anchorName, final String parentNodeXpath,
- final Collection<Collection<DataNode>> newListsElements) {
+ public void addMultipleLists(final String dataspaceName, final String anchorName, final String parentNodeXpath,
+ final Collection<Collection<DataNode>> newLists) {
+ final Collection<String> failedXpaths = new HashSet<>();
+ newLists.forEach(newList -> {
+ try {
+ addChildrenDataNodes(dataspaceName, anchorName, parentNodeXpath, newList);
+ } catch (final AlreadyDefinedExceptionBatch e) {
+ failedXpaths.addAll(e.getAlreadyDefinedXpaths());
+ }
+ });
- newListsElements.forEach(
- newListElement -> addListElements(dataspaceName, anchorName, parentNodeXpath, newListElement));
+ if (!failedXpaths.isEmpty()) {
+ throw new AlreadyDefinedExceptionBatch(failedXpaths);
+ }
}
- private void addChildDataNodes(final String dataspaceName, final String anchorName, final String parentNodeXpath,
- final Collection<DataNode> newChildren) {
+ private void addNewChildDataNode(final String dataspaceName, final String anchorName,
+ final String parentNodeXpath, final DataNode newChild) {
final FragmentEntity parentFragmentEntity = getFragmentByXpath(dataspaceName, anchorName, parentNodeXpath);
+ final FragmentEntity newChildAsFragmentEntity =
+ convertToFragmentWithAllDescendants(parentFragmentEntity.getDataspace(),
+ parentFragmentEntity.getAnchor(), newChild);
+ newChildAsFragmentEntity.setParentId(parentFragmentEntity.getId());
+ try {
+ fragmentRepository.save(newChildAsFragmentEntity);
+ } catch (final DataIntegrityViolationException e) {
+ throw AlreadyDefinedException.forDataNode(newChild.getXpath(), anchorName, e);
+ }
+
+ }
+
+ private void addChildrenDataNodes(final String dataspaceName, final String anchorName, final String parentNodeXpath,
+ final Collection<DataNode> newChildren) {
+ final FragmentEntity parentFragmentEntity = getFragmentByXpath(dataspaceName, anchorName, parentNodeXpath);
+ final List<FragmentEntity> fragmentEntities = new ArrayList<>(newChildren.size());
try {
- final List<FragmentEntity> fragmentEntities = new ArrayList<>();
newChildren.forEach(newChildAsDataNode -> {
- final FragmentEntity newChildAsFragmentEntity = convertToFragmentWithAllDescendants(
- parentFragmentEntity.getDataspace(),
- parentFragmentEntity.getAnchor(),
- newChildAsDataNode);
+ final FragmentEntity newChildAsFragmentEntity =
+ convertToFragmentWithAllDescendants(parentFragmentEntity.getDataspace(),
+ parentFragmentEntity.getAnchor(), newChildAsDataNode);
newChildAsFragmentEntity.setParentId(parentFragmentEntity.getId());
fragmentEntities.add(newChildAsFragmentEntity);
});
fragmentRepository.saveAll(fragmentEntities);
- } catch (final DataIntegrityViolationException exception) {
- final List<String> conflictXpaths = newChildren.stream()
- .map(DataNode::getXpath)
- .collect(Collectors.toList());
- throw AlreadyDefinedException.forDataNodes(conflictXpaths, anchorName, exception);
+ } catch (final DataIntegrityViolationException e) {
+ log.warn("Exception occurred : {} , While saving : {} children, retrying using individual save operations",
+ e, fragmentEntities.size());
+ retrySavingEachChildIndividually(dataspaceName, anchorName, parentNodeXpath, newChildren);
+ }
+ }
+
+ private void retrySavingEachChildIndividually(final String dataspaceName, final String anchorName,
+ final String parentNodeXpath, final Collection<DataNode> newChildren) {
+ final Collection<String> failedXpaths = new HashSet<>();
+ for (final DataNode newChild : newChildren) {
+ try {
+ addNewChildDataNode(dataspaceName, anchorName, parentNodeXpath, newChild);
+ } catch (final AlreadyDefinedException e) {
+ failedXpaths.add(newChild.getXpath());
+ }
+ }
+ if (!failedXpaths.isEmpty()) {
+ throw new AlreadyDefinedExceptionBatch(failedXpaths);
}
}
@@ -199,8 +234,8 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
} catch (final PathParsingException e) {
throw new CpsPathException(e.getMessage());
}
- return fragmentRepository.getByDataspaceAndAnchorAndXpath(dataspaceEntity, anchorEntity,
- normalizedXpath);
+
+ return fragmentRepository.getByDataspaceAndAnchorAndXpath(dataspaceEntity, anchorEntity, normalizedXpath);
}
}
@@ -310,8 +345,7 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
} catch (final StaleStateException staleStateException) {
throw new ConcurrencyException("Concurrent Transactions",
String.format("dataspace :'%s', Anchor : '%s' and xpath: '%s' is updated by another transaction.",
- dataspaceName, anchorName, dataNode.getXpath()),
- staleStateException);
+ dataspaceName, anchorName, dataNode.getXpath()));
}
}
@@ -319,6 +353,7 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
public void updateDataNodesAndDescendants(final String dataspaceName,
final String anchorName,
final List<DataNode> dataNodes) {
+
final Map<DataNode, FragmentEntity> dataNodeFragmentEntityMap = dataNodes.stream()
.collect(Collectors.toMap(
dataNode -> dataNode, dataNode -> getFragmentByXpath(dataspaceName, anchorName, dataNode.getXpath())));
@@ -327,10 +362,27 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
try {
fragmentRepository.saveAll(dataNodeFragmentEntityMap.values());
} catch (final StaleStateException staleStateException) {
- throw new ConcurrencyException("Concurrent Transactions",
- String.format("A data node in dataspace :'%s' with Anchor : '%s' is updated by another transaction.",
- dataspaceName, anchorName),
- staleStateException);
+ retryUpdateDataNodesIndividually(dataspaceName, anchorName, dataNodeFragmentEntityMap.values());
+ }
+ }
+
+ private void retryUpdateDataNodesIndividually(final String dataspaceName, final String anchorName,
+ final Collection<FragmentEntity> fragmentEntities) {
+ final Collection<String> failedXpaths = new HashSet<>();
+
+ fragmentEntities.forEach(dataNodeFragment -> {
+ try {
+ fragmentRepository.save(dataNodeFragment);
+ } catch (final StaleStateException e) {
+ failedXpaths.add(dataNodeFragment.getXpath());
+ }
+ });
+
+ if (!failedXpaths.isEmpty()) {
+ final String failedXpathsConcatenated = String.join(",", failedXpaths);
+ throw new ConcurrencyException("Concurrent Transactions", String.format(
+ "DataNodes : %s in Dataspace :'%s' with Anchor : '%s' are updated by another transaction.",
+ failedXpathsConcatenated, dataspaceName, anchorName));
}
}
diff --git a/cps-ri/src/main/java/org/onap/cps/spi/repository/FragmentRepositoryCpsPathQueryImpl.java b/cps-ri/src/main/java/org/onap/cps/spi/repository/FragmentRepositoryCpsPathQueryImpl.java
index 4489cddd3..654c1c085 100644
--- a/cps-ri/src/main/java/org/onap/cps/spi/repository/FragmentRepositoryCpsPathQueryImpl.java
+++ b/cps-ri/src/main/java/org/onap/cps/spi/repository/FragmentRepositoryCpsPathQueryImpl.java
@@ -20,7 +20,6 @@
package org.onap.cps.spi.repository;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -29,12 +28,14 @@ import javax.persistence.PersistenceContext;
import javax.persistence.Query;
import javax.transaction.Transactional;
import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import org.onap.cps.cpspath.parser.CpsPathPrefixType;
import org.onap.cps.cpspath.parser.CpsPathQuery;
import org.onap.cps.spi.entities.FragmentEntity;
import org.onap.cps.utils.JsonObjectMapper;
@RequiredArgsConstructor
+@Slf4j
public class FragmentRepositoryCpsPathQueryImpl implements FragmentRepositoryCpsPathQuery {
public static final String REGEX_ABSOLUTE_PATH_PREFIX = ".*\\/";
@@ -62,16 +63,8 @@ public class FragmentRepositoryCpsPathQueryImpl implements FragmentRepositoryCps
addTextFunctionCondition(cpsPathQuery, sqlStringBuilder, queryParameters);
final Query query = entityManager.createNativeQuery(sqlStringBuilder.toString(), FragmentEntity.class);
setQueryParameters(query, queryParameters);
- return getFragmentEntitiesAsStream(query);
- }
-
- private List<FragmentEntity> getFragmentEntitiesAsStream(final Query query) {
- final List<FragmentEntity> fragmentEntities = new ArrayList<>();
- query.getResultStream().forEach(fragmentEntity -> {
- fragmentEntities.add((FragmentEntity) fragmentEntity);
- entityManager.detach(fragmentEntity);
- });
-
+ final List<FragmentEntity> fragmentEntities = query.getResultList();
+ log.debug("Fetched {} fragment entities by anchor and cps path.", fragmentEntities.size());
return fragmentEntities;
}
diff --git a/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceIntegrationSpec.groovy b/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceIntegrationSpec.groovy
index acc243b5b..5e15ca795 100755
--- a/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceIntegrationSpec.groovy
+++ b/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceIntegrationSpec.groovy
@@ -27,6 +27,7 @@ import org.onap.cps.cpspath.parser.PathParsingException
import org.onap.cps.spi.CpsDataPersistenceService
import org.onap.cps.spi.entities.FragmentEntity
import org.onap.cps.spi.exceptions.AlreadyDefinedException
+import org.onap.cps.spi.exceptions.AlreadyDefinedExceptionBatch
import org.onap.cps.spi.exceptions.AnchorNotFoundException
import org.onap.cps.spi.exceptions.CpsAdminException
import org.onap.cps.spi.exceptions.CpsPathException
@@ -48,6 +49,7 @@ class CpsDataPersistenceServiceIntegrationSpec extends CpsPersistenceSpecBase {
CpsDataPersistenceService objectUnderTest
static final JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
+ static final DataNodeBuilder dataNodeBuilder = new DataNodeBuilder()
static final String SET_DATA = '/data/fragment.sql'
static final int DATASPACE_1001_ID = 1001L
@@ -165,7 +167,7 @@ class CpsDataPersistenceServiceIntegrationSpec extends CpsPersistenceSpecBase {
def grandChild = buildDataNode('/parent-201/child-204[@key="NEW1"]/grand-child-204[@key2="NEW1-CHILD"]', [leave:'value'], [])
listElements[0].childDataNodes = [grandChild]
when: 'the new data node (list elements) are added to an existing parent node'
- objectUnderTest.addListElementsBatch(DATASPACE_NAME, ANCHOR_NAME3, '/parent-201', [listElements])
+ objectUnderTest.addMultipleLists(DATASPACE_NAME, ANCHOR_NAME3, '/parent-201', [listElements])
then: 'new entries are successfully persisted, parent node now contains 5 children (2 new + 3 existing before)'
def parentFragment = fragmentRepository.getById(LIST_DATA_NODE_PARENT201_FRAGMENT_ID)
def allChildXpaths = parentFragment.childFragments.collect { it.xpath }
@@ -179,17 +181,41 @@ class CpsDataPersistenceServiceIntegrationSpec extends CpsPersistenceSpecBase {
}
@Sql([CLEAR_DATA, SET_DATA])
+ def 'Add multiple list with a mix of existing and new elements'() {
+ given: 'two new child list elements for an existing parent'
+ def existingDataNode = dataNodeBuilder.withXpath('/parent-100/child-001').withLeaves(['id': '001']).build()
+ def newDataNode1 = dataNodeBuilder.withXpath('/parent-100/child-new1').withLeaves(['id': 'new1']).build()
+ def newDataNode2 = dataNodeBuilder.withXpath('/parent-200/child-new2').withLeaves(['id': 'new2']).build()
+ def dataNodeList1 = [existingDataNode, newDataNode1]
+ def dataNodeList2 = [newDataNode2]
+ when: 'duplicate data node is requested to be added'
+ objectUnderTest.addMultipleLists(DATASPACE_NAME, ANCHOR_NAME3, '/', [dataNodeList1,dataNodeList2])
+ then: 'already defined batch exception is thrown'
+ def thrown = thrown(AlreadyDefinedExceptionBatch)
+ and: 'it only contains the xpath(s) of the duplicated elements'
+ assert thrown.alreadyDefinedXpaths.size() == 1
+ assert thrown.alreadyDefinedXpaths.contains('/parent-100/child-001')
+ and: 'it does NOT contains the xpaths of the new element that were not combined with existing elements'
+ assert !thrown.alreadyDefinedXpaths.contains('/parent-100/child-new1')
+ assert !thrown.alreadyDefinedXpaths.contains('/parent-100/child-new1')
+ and: 'the new entity is inserted correctly'
+ def dataspaceEntity = dataspaceRepository.getByName(DATASPACE_NAME)
+ def anchorEntity = anchorRepository.getByDataspaceAndName(dataspaceEntity, ANCHOR_NAME3)
+ fragmentRepository.findByDataspaceAndAnchorAndXpath(dataspaceEntity, anchorEntity, '/parent-200/child-new2').isPresent()
+ }
+
+ @Sql([CLEAR_DATA, SET_DATA])
def 'Add list element error scenario: #scenario.'() {
given: 'list element as a collection of data nodes'
- def listElementCollection = toDataNodes(listElementXpaths)
+ def listElements = toDataNodes(listElementXpaths)
when: 'attempt to add list elements to parent node'
- objectUnderTest.addListElements(DATASPACE_NAME, ANCHOR_NAME3, parentNodeXpath, listElementCollection)
+ objectUnderTest.addListElements(DATASPACE_NAME, ANCHOR_NAME3, parentNodeXpath, listElements)
then: 'a #expectedException is thrown'
thrown(expectedException)
where: 'following parameters were used'
scenario | parentNodeXpath | listElementXpaths || expectedException
'parent node does not exist' | '/unknown' | ['irrelevant'] || DataNodeNotFoundException
- 'data fragment already exists' | '/parent-201' | ["/parent-201/child-204[@key='A']"] || AlreadyDefinedException
+ 'data fragment already exists' | '/parent-201' | ["/parent-201/child-204[@key='A']"] || AlreadyDefinedExceptionBatch
}
@Sql([CLEAR_DATA, SET_DATA])
@@ -559,8 +585,9 @@ class CpsDataPersistenceServiceIntegrationSpec extends CpsPersistenceSpecBase {
return xpaths.collect { new DataNodeBuilder().withXpath(it).build() }
}
+
static DataNode buildDataNode(xpath, leaves, childDataNodes) {
- return new DataNodeBuilder().withXpath(xpath).withLeaves(leaves).withChildDataNodes(childDataNodes).build()
+ return dataNodeBuilder.withXpath(xpath).withLeaves(leaves).withChildDataNodes(childDataNodes).build()
}
static Map<String, Object> getLeavesMap(FragmentEntity fragmentEntity) {
diff --git a/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy b/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy
index 1bbf358e5..470b03afd 100644
--- a/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy
+++ b/cps-ri/src/test/groovy/org/onap/cps/spi/impl/CpsDataPersistenceServiceSpec.groovy
@@ -86,17 +86,25 @@ class CpsDataPersistenceServiceSpec extends Specification {
assert concurrencyException.getDetails().contains('/some/xpath')
}
- def 'Handling of StaleStateException (caused by concurrent updates) during update data nodes and descendants.'() {
- given: 'the fragment repository returns a list of fragment entities'
- mockFragmentRepository.getByDataspaceAndAnchorAndXpath(*_) >> new FragmentEntity()
- and: 'a data node is concurrently updated by another transaction'
+ def 'Handling of StaleStateException (caused by concurrent updates) during update data nodes and descendants.'() {
+ given: 'the system contains and can update one datanode'
+ def dataNode1 = mockDataNodeAndFragmentEntity('/node1', 'OK')
+ and: 'the system contains two more datanodes that throw an exception while updating'
+ def dataNode2 = mockDataNodeAndFragmentEntity('/node2', 'EXCEPTION')
+ def dataNode3 = mockDataNodeAndFragmentEntity('/node3', 'EXCEPTION')
+ and: 'the batch update will therefore also fail'
mockFragmentRepository.saveAll(*_) >> { throw new StaleStateException("concurrent updates") }
- when: 'attempt to update data node with submitted data nodes'
- objectUnderTest.updateDataNodesAndDescendants('some-dataspace', 'some-anchor', [])
+ when: 'attempt batch update data nodes'
+ objectUnderTest.updateDataNodesAndDescendants('some-dataspace', 'some-anchor', [dataNode1, dataNode2, dataNode3])
then: 'concurrency exception is thrown'
- def concurrencyException = thrown(ConcurrencyException)
- assert concurrencyException.getDetails().contains('some-dataspace')
- assert concurrencyException.getDetails().contains('some-anchor')
+ def thrown = thrown(ConcurrencyException)
+ assert thrown.message == 'Concurrent Transactions'
+ and: 'it does not contain the successfull datanode'
+ assert !thrown.details.contains('/node1')
+ and: 'it contains the failed datanodes'
+ assert thrown.details.contains('/node2')
+ assert thrown.details.contains('/node3')
+
}
def 'Retrieving a data node with a property JSON value of #scenario'() {
@@ -193,4 +201,14 @@ class CpsDataPersistenceServiceSpec extends Specification {
assert fragmentEntities.size() == 2
}})
}
+
+ def mockDataNodeAndFragmentEntity(xpath, scenario) {
+ def dataNode = new DataNodeBuilder().withXpath(xpath).build()
+ def fragmentEntity = new FragmentEntity(xpath: xpath, childFragments: [])
+ mockFragmentRepository.getByDataspaceAndAnchorAndXpath(_, _, xpath) >> fragmentEntity
+ if ('EXCEPTION' == scenario) {
+ mockFragmentRepository.save(fragmentEntity) >> { throw new StaleStateException("concurrent updates") }
+ }
+ return dataNode
+ }
} \ No newline at end of file