1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
/*
* ============LICENSE_START=======================================================
* Copyright (C) 2023-2024 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the 'License');
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an 'AS IS' BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
package org.onap.cps.integration.functional
import java.time.OffsetDateTime
import org.onap.cps.api.CpsAnchorService
import org.onap.cps.integration.base.CpsIntegrationSpecBase
import org.onap.cps.spi.FetchDescendantsOption
import org.onap.cps.spi.exceptions.AlreadyDefinedException
import org.onap.cps.spi.exceptions.AnchorNotFoundException
class CpsAnchorServiceIntegrationSpec extends CpsIntegrationSpecBase {
CpsAnchorService objectUnderTest
def setup() { objectUnderTest = cpsAnchorService }
def 'Anchor CRUD operations.'() {
when: 'an anchor is created'
objectUnderTest.createAnchor(GENERAL_TEST_DATASPACE, BOOKSTORE_SCHEMA_SET, 'newAnchor')
then: 'the anchor be read'
assert objectUnderTest.getAnchor(GENERAL_TEST_DATASPACE, 'newAnchor').name == 'newAnchor'
and: 'it can be deleted'
objectUnderTest.deleteAnchor(GENERAL_TEST_DATASPACE,'newAnchor')
then: 'the anchor no longer exists i.e. an exception is thrown if an attempt is made to retrieve it'
def thrown = null
try {
objectUnderTest.getAnchor(GENERAL_TEST_DATASPACE, 'newAnchor')
} catch(Exception exception) {
thrown = exception
}
assert thrown instanceof AnchorNotFoundException
}
def 'Filtering multiple anchors.'() {
when: '2 anchors with bookstore schema set are created'
objectUnderTest.createAnchor(GENERAL_TEST_DATASPACE, BOOKSTORE_SCHEMA_SET, 'anchor1')
objectUnderTest.createAnchor(GENERAL_TEST_DATASPACE, BOOKSTORE_SCHEMA_SET, 'anchor2')
and: '1 anchor with "other" schema set is created'
createStandardBookStoreSchemaSet(GENERAL_TEST_DATASPACE, 'otherSchemaSet')
objectUnderTest.createAnchor(GENERAL_TEST_DATASPACE, 'otherSchemaSet', 'anchor3')
then: 'there are 3 anchors in the general test database'
assert objectUnderTest.getAnchors(GENERAL_TEST_DATASPACE).size() == 3
and: 'there are 2 anchors associated with bookstore schema set'
assert objectUnderTest.getAnchors(GENERAL_TEST_DATASPACE, BOOKSTORE_SCHEMA_SET).size() == 2
and: 'there is 1 anchor associated with other schema set'
assert objectUnderTest.getAnchors(GENERAL_TEST_DATASPACE, 'otherSchemaSet').size() == 1
}
def 'Querying anchor(name)s (depends on previous test!).'() {
expect: 'there are now 3 anchors using the "stores" module (both schema sets use the same modules) '
assert objectUnderTest.queryAnchorNames(GENERAL_TEST_DATASPACE, ['stores', 'bookstore-types']).size() == 3
and: 'there are no anchors using both "stores" and a "unused-model"'
assert objectUnderTest.queryAnchorNames(GENERAL_TEST_DATASPACE, ['stores', 'unused-model']).size() == 0
}
def 'Duplicate anchors.'() {
given: 'an anchor is created'
objectUnderTest.createAnchor(GENERAL_TEST_DATASPACE, BOOKSTORE_SCHEMA_SET, 'newAnchor')
when: 'attempt to create another anchor with the same name'
objectUnderTest.createAnchor(GENERAL_TEST_DATASPACE, BOOKSTORE_SCHEMA_SET, 'newAnchor')
then: 'an exception is thrown that the anchor already is defined'
thrown(AlreadyDefinedException)
cleanup:
objectUnderTest.deleteAnchor(GENERAL_TEST_DATASPACE, 'newAnchor')
}
def 'Query anchors without any known modules'() {
when: 'querying for anchors with #scenario'
def result = objectUnderTest.queryAnchorNames(GENERAL_TEST_DATASPACE, ['unknownModule'])
then: 'an empty result is returned (no error)'
assert result == []
}
def 'Update anchor schema set.'() {
when: 'a new schema set with tree yang model is created'
def newTreeYangModelAsString = readResourceDataFile('tree/new-test-tree.yang')
cpsModuleService.createSchemaSet(GENERAL_TEST_DATASPACE, 'newTreeSchemaSet', [tree: newTreeYangModelAsString])
then: 'an anchor with new schema set is created'
objectUnderTest.createAnchor(GENERAL_TEST_DATASPACE, 'newTreeSchemaSet', 'anchor4')
and: 'the new tree datanode is saved'
def treeJsonData = readResourceDataFile('tree/new-test-tree.json')
cpsDataService.saveData(GENERAL_TEST_DATASPACE, 'anchor4', treeJsonData, OffsetDateTime.now())
and: 'saved tree data node can be retrieved by its normalized xpath'
def branchName = cpsDataService.getDataNodes(GENERAL_TEST_DATASPACE, 'anchor4', "/test-tree/branch", FetchDescendantsOption.DIRECT_CHILDREN_ONLY)[0].leaves['name']
assert branchName == 'left'
and: 'a another schema set with updated tree yang model is created'
def updatedTreeYangModelAsString = readResourceDataFile('tree/updated-test-tree.yang')
cpsModuleService.createSchemaSet(GENERAL_TEST_DATASPACE, 'anotherTreeSchemaSet', [tree: updatedTreeYangModelAsString])
and: 'anchor4 schema set is updated with another schema set successfully'
objectUnderTest.updateAnchorSchemaSet(GENERAL_TEST_DATASPACE, 'anchor4', 'anotherTreeSchemaSet')
when: 'updated tree data node with new leaves'
def updatedTreeJsonData = readResourceDataFile('tree/updated-test-tree.json')
cpsDataService.updateNodeLeaves(GENERAL_TEST_DATASPACE, "anchor4", "/test-tree/branch[@name='left']", updatedTreeJsonData, OffsetDateTime.now())
then: 'updated tree data node can be retrieved by its normalized xpath'
def birdsName = cpsDataService.getDataNodes(GENERAL_TEST_DATASPACE, 'anchor4',"/test-tree/branch[@name='left']/nest", FetchDescendantsOption.DIRECT_CHILDREN_ONLY)[0].leaves['birds'] as List
assert birdsName.size() == 3
assert birdsName.containsAll('Night Owl', 'Raven', 'Crow')
}
}
|