diff options
19 files changed, 548 insertions, 561 deletions
diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy index ae7c56458d..f06af6c10a 100644 --- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy +++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation + * Copyright (C) 2023-2024 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,19 +37,27 @@ class NcmpDatastoreRequestHandlerSpec extends Specification { def objectUnderTest = new NcmpPassthroughResourceRequestHandler(spiedCpsNcmpTaskExecutor, mockNetworkCmProxyDataService) + def setup() { + objectUnderTest.timeOutInMilliSeconds = 100 + } + def 'Attempt to execute async get request with #scenario.'() { given: 'notification feature is turned on/off' objectUnderTest.notificationFeatureEnabled = notificationFeatureEnabled + and: ' a flag to track the network service call' + def networkServiceMethodCalled = false + and: 'the (mocked) service will use the flag to indicate if it is called' + mockNetworkCmProxyDataService.getResourceDataForCmHandle('ds', 'ch1', 'resource1', 'options', _, _) >> { + networkServiceMethodCalled = true + } when: 'get request is executed with topic = #topic' objectUnderTest.executeRequest('ds', 'ch1', 'resource1', 'options', topic, false) - and: 'wait a little for async execution (only if expected)' - if (expectedCalls > 0) { - Thread.sleep(500) - } then: 'the task is executed in an async fashion or not' expectedCalls * spiedCpsNcmpTaskExecutor.executeTask(*_) - /*and: 'the service request is always invoked' - 1 * mockNetworkCmProxyDataService.getResourceDataForCmHandle('ds', 'ch1', 'resource1', 'options', _, _)*/ + and: 'the service request is always invoked within 1 seconds' + new PollingConditions().within(1) { + assert networkServiceMethodCalled == true + } where: 'the following parameters are used' scenario | notificationFeatureEnabled | topic || expectedCalls 'feature on, valid topic' | true | 'valid' || 1 @@ -89,9 +97,9 @@ class NcmpDatastoreRequestHandlerSpec extends Specification { objectUnderTest.executeRequest('myTopic', dataOperationRequest) then: 'the task is executed in an async fashion' 1 * spiedCpsNcmpTaskExecutor.executeTask(*_) - and: 'the network service is invoked (wait max. 5 seconds)' - new PollingConditions(timeout: 30).eventually { - //TODO Fix test assertion + and: 'the network service is invoked within 1 seconds' + new PollingConditions().within(1) { + assert networkServiceMethodCalled == true } where: 'the following datastores are used' datastore << ['ncmp-datastore:passthrough-running', 'ncmp-datastore:passthrough-operational'] diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumer.java new file mode 100644 index 0000000000..8bc36943a3 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumer.java @@ -0,0 +1,64 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.events.cmsubscription; + +import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent; + +import io.cloudevents.CloudEvent; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class CmSubscriptionNcmpInEventConsumer { + + @Value("${notification.enabled:true}") + private boolean notificationFeatureEnabled; + + @Value("${ncmp.model-loader.subscription:false}") + private boolean subscriptionModelLoaderEnabled; + + /** + * Consume the specified event. + * + * @param subscriptionEventConsumerRecord the event to be consumed + */ + @KafkaListener(topics = "${app.ncmp.avc.subscription-topic}", + containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") + public void consumeSubscriptionEvent(final ConsumerRecord<String, CloudEvent> subscriptionEventConsumerRecord) { + final CloudEvent cloudEvent = subscriptionEventConsumerRecord.value(); + final CmSubscriptionNcmpInEvent cmSubscriptionNcmpInEvent = + toTargetEvent(cloudEvent, CmSubscriptionNcmpInEvent.class); + if (subscriptionModelLoaderEnabled) { + log.info("Subscription with name {} to be mapped to hazelcast object...", + cmSubscriptionNcmpInEvent.getData().getSubscriptionId()); + } + if ("subscriptionCreated".equals(cloudEvent.getType()) && cmSubscriptionNcmpInEvent != null) { + log.info("Subscription for ClientID {} with name {} ...", + cloudEvent.getSource(), + cmSubscriptionNcmpInEvent.getData().getSubscriptionId()); + } + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java index a31332f094..0ed95adff2 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022-2023 Nordix Foundation + * Copyright (C) 2022-2024 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -164,12 +164,12 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState } private void setInitialStates(final YangModelCmHandle yangModelCmHandle) { - CompositeStateUtils.setInitialDataStoreSyncState().accept(yangModelCmHandle.getCompositeState()); - CompositeStateUtils.setCompositeState(READY).accept(yangModelCmHandle.getCompositeState()); + CompositeStateUtils.setInitialDataStoreSyncState(yangModelCmHandle.getCompositeState()); + CompositeStateUtils.setCompositeState(READY, yangModelCmHandle.getCompositeState()); } private void retryCmHandle(final YangModelCmHandle yangModelCmHandle) { - CompositeStateUtils.setCompositeStateForRetry().accept(yangModelCmHandle.getCompositeState()); + CompositeStateUtils.setCompositeStateForRetry(yangModelCmHandle.getCompositeState()); } private void registerNewCmHandle(final YangModelCmHandle yangModelCmHandle) { @@ -178,7 +178,7 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState } private void setCmHandleState(final YangModelCmHandle yangModelCmHandle, final CmHandleState targetCmHandleState) { - CompositeStateUtils.setCompositeState(targetCmHandleState).accept(yangModelCmHandle.getCompositeState()); + CompositeStateUtils.setCompositeState(targetCmHandleState, yangModelCmHandle.getCompositeState()); } private boolean isNew(final CompositeState existingCompositeState) { diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/inventory/CompositeStateUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/inventory/CompositeStateUtils.java index 99cca8c0b3..35ad54fdef 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/inventory/CompositeStateUtils.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/inventory/CompositeStateUtils.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022-2023 Nordix Foundation + * Copyright (C) 2022-2024 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,6 @@ package org.onap.cps.ncmp.api.impl.inventory; -import java.util.function.Consumer; import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -34,31 +33,23 @@ public class CompositeStateUtils { /** * Sets the cmHandleState to the provided state and updates the timestamp. - * - * @return Updated CompositeState */ - public static Consumer<CompositeState> setCompositeState(final CmHandleState cmHandleState) { - return compositeState -> { - compositeState.setCmHandleState(cmHandleState); - compositeState.setLastUpdateTimeNow(); - }; + public static void setCompositeState(final CmHandleState cmHandleState, + final CompositeState compositeState) { + compositeState.setCmHandleState(cmHandleState); + compositeState.setLastUpdateTimeNow(); } /** * Set the Operational datastore sync state based on the global flag. - * - * @return Updated CompositeState */ - public static Consumer<CompositeState> setInitialDataStoreSyncState() { - - return compositeState -> { - compositeState.setDataSyncEnabled(false); - final CompositeState.Operational operational = - getInitialDataStoreSyncState(compositeState.getDataSyncEnabled()); - final CompositeState.DataStores dataStores = - CompositeState.DataStores.builder().operationalDataStore(operational).build(); - compositeState.setDataStores(dataStores); - }; + public static void setInitialDataStoreSyncState(final CompositeState compositeState) { + compositeState.setDataSyncEnabled(false); + final CompositeState.Operational operational = + getInitialDataStoreSyncState(compositeState.getDataSyncEnabled()); + final CompositeState.DataStores dataStores = + CompositeState.DataStores.builder().operationalDataStore(operational).build(); + compositeState.setDataStores(dataStores); } /** @@ -91,19 +82,15 @@ public class CompositeStateUtils { /** * Sets the cmHandleState to ADVISED and retain the lock details. Used in retry scenarios. - * - * @return Updated CompositeState */ - public static Consumer<CompositeState> setCompositeStateForRetry() { - return compositeState -> { - compositeState.setCmHandleState(CmHandleState.ADVISED); - compositeState.setLastUpdateTimeNow(); - final String oldLockReasonDetails = compositeState.getLockReason().getDetails(); - final CompositeState.LockReason lockReason = - CompositeState.LockReason.builder() - .lockReasonCategory(compositeState.getLockReason().getLockReasonCategory()) - .details(oldLockReasonDetails).build(); - compositeState.setLockReason(lockReason); - }; + public static void setCompositeStateForRetry(final CompositeState compositeState) { + compositeState.setCmHandleState(CmHandleState.ADVISED); + compositeState.setLastUpdateTimeNow(); + final String oldLockReasonDetails = compositeState.getLockReason().getDetails(); + final CompositeState.LockReason lockReason = + CompositeState.LockReason.builder() + .lockReasonCategory(compositeState.getLockReason().getLockReasonCategory()) + .details(oldLockReasonDetails).build(); + compositeState.setLockReason(lockReason); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/CmDataSubscriptionModelLoader.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/CmDataSubscriptionModelLoader.java index 81055db847..88ba5e91be 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/CmDataSubscriptionModelLoader.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/CmDataSubscriptionModelLoader.java @@ -45,13 +45,6 @@ public class CmDataSubscriptionModelLoader extends AbstractModelLoader { private static final String DATASTORE_PASSTHROUGH_OPERATIONAL = "ncmp-datastores:passthrough-operational"; private static final String DATASTORE_PASSTHROUGH_RUNNING = "ncmp-datastores:passthrough-running"; - private static final String DEPRECATED_MODEL_FILENAME = "subscription.yang"; - private static final String DEPRECATED_ANCHOR_NAME = "AVC-Subscriptions"; - private static final String DEPRECATED_SCHEMASET_NAME = "subscriptions"; - private static final String DEPRECATED_REGISTRY_DATANODE_NAME = "subscription-registry"; - - - public CmDataSubscriptionModelLoader(final CpsDataspaceService cpsDataspaceService, final CpsModuleService cpsModuleService, final CpsAnchorService cpsAnchorService, @@ -74,10 +67,6 @@ public class CmDataSubscriptionModelLoader extends AbstractModelLoader { } private void onboardSubscriptionModels() { - createSchemaSet(NCMP_DATASPACE_NAME, DEPRECATED_SCHEMASET_NAME, DEPRECATED_MODEL_FILENAME); - createAnchor(NCMP_DATASPACE_NAME, DEPRECATED_SCHEMASET_NAME, DEPRECATED_ANCHOR_NAME); - createTopLevelDataNode(NCMP_DATASPACE_NAME, DEPRECATED_ANCHOR_NAME, DEPRECATED_REGISTRY_DATANODE_NAME); - createSchemaSet(NCMP_DATASPACE_NAME, SCHEMASET_NAME, MODEL_FILENAME); createAnchor(NCMP_DATASPACE_NAME, SCHEMASET_NAME, ANCHOR_NAME); createTopLevelDataNode(NCMP_DATASPACE_NAME, ANCHOR_NAME, REGISTRY_DATANODE_NAME); diff --git a/cps-ncmp-service/src/main/resources/models/subscription.yang b/cps-ncmp-service/src/main/resources/models/subscription.yang deleted file mode 100644 index 7096c18abc..0000000000 --- a/cps-ncmp-service/src/main/resources/models/subscription.yang +++ /dev/null @@ -1,57 +0,0 @@ -module subscription { - yang-version 1.1; - namespace "org:onap:ncmp:subscription"; - - prefix subs; - - revision "2023-03-21" { - description - "NCMP subscription model"; - } - - container subscription-registry { - list subscription { - key "clientID subscriptionName"; - - leaf clientID { - type string; - } - - leaf subscriptionName { - type string; - } - - leaf topic { - type string; - } - - leaf isTagged { - type boolean; - } - - container predicates { - - list targetCmHandles { - key "cmHandleId"; - - leaf cmHandleId { - type string; - } - - leaf status { - type string; - } - - leaf details { - type string; - } - } - - leaf datastore { - type string; - } - } - - } - } -}
\ No newline at end of file diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumerSpec.groovy new file mode 100644 index 0000000000..57e77eba31 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumerSpec.groovy @@ -0,0 +1,93 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an 'AS IS' BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.events.cmsubscription + +import ch.qos.logback.classic.Level +import ch.qos.logback.classic.Logger +import ch.qos.logback.classic.spi.ILoggingEvent +import ch.qos.logback.core.read.ListAppender +import com.fasterxml.jackson.databind.ObjectMapper +import io.cloudevents.CloudEvent +import io.cloudevents.core.builder.CloudEventBuilder +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec +import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent +import org.onap.cps.ncmp.utils.TestUtils +import org.onap.cps.utils.JsonObjectMapper +import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest + +@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper]) +class CmSubscriptionNcmpInEventConsumerSpec extends MessagingBaseSpec { + + def objectUnderTest = new CmSubscriptionNcmpInEventConsumer() + def logger = Spy(ListAppender<ILoggingEvent>) + + @Autowired + JsonObjectMapper jsonObjectMapper + + @Autowired + ObjectMapper objectMapper + + @BeforeEach + void setup() { + ((Logger) LoggerFactory.getLogger(CmSubscriptionNcmpInEventConsumer.class)).addAppender(logger); + logger.start(); + } + + @AfterEach + void teardown() { + ((Logger) LoggerFactory.getLogger(CmSubscriptionNcmpInEventConsumer.class)).detachAndStopAllAppenders(); + } + + + def 'Consume valid CMSubscription create message'() { + given: 'a cmsubscription event' + def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmSubscriptionNcmpInEvent.json') + def testEventSent = jsonObjectMapper.convertJsonString(jsonData, CmSubscriptionNcmpInEvent.class) + def testCloudEventSent = CloudEventBuilder.v1() + .withData(objectMapper.writeValueAsBytes(testEventSent)) + .withId('subscriptionCreated') + .withType('subscriptionCreated') + .withSource(URI.create('some-resource')) + .withExtension('correlationid', 'test-cmhandle1').build() + def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent) + and: 'notifications are enabled' + objectUnderTest.notificationFeatureEnabled = true + and: 'subscription model loader is enabled' + objectUnderTest.subscriptionModelLoaderEnabled = true + when: 'the valid event is consumed' + objectUnderTest.consumeSubscriptionEvent(consumerRecord) + then: 'an event is logged with level INFO' + def loggingEvent = getLoggingEvent() + assert loggingEvent.level == Level.INFO + and: 'the log indicates the task completed successfully' + assert loggingEvent.formattedMessage == 'Subscription with name cm-subscription-001 to be mapped to hazelcast object...' + } + + def getLoggingEvent() { + return logger.list[0] + } + +} diff --git a/cps-ncmp-service/src/test/resources/cmSubscription/cmSubscriptionNcmpInEvent.json b/cps-ncmp-service/src/test/resources/cmSubscription/cmSubscriptionNcmpInEvent.json new file mode 100644 index 0000000000..5246618820 --- /dev/null +++ b/cps-ncmp-service/src/test/resources/cmSubscription/cmSubscriptionNcmpInEvent.json @@ -0,0 +1,23 @@ +{ + "data": { + "subscriptionId": "cm-subscription-001", + "predicates": [ + { + "targetFilter": [ + "CMHandle1", + "CMHandle2", + "CMHandle3" + ], + "scopeFilter": { + "datastore": "ncmp-datastore:passthrough-running", + "xpath-filter": [ + "//_3gpp-nr-nrm-gnbdufunction:GNBDUFunction/_3gpp-nr-nrm-nrcelldu:NRCellDU/", + "//_3gpp-nr-nrm-gnbcuupfunction:GNBCUUPFunction//", + "//_3gpp-nr-nrm-gnbcucpfunction:GNBCUCPFunction/_3gpp-nr-nrm-nrcelldu:NRCellCU//", + "//_3gpp-nr-nrm-nrsectorcarrier:NRSectorCarrier//" + ] + } + } + ] + } +}
\ No newline at end of file diff --git a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionDmiInEvent.json b/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionDmiInEvent.json deleted file mode 100644 index f31362a1c6..0000000000 --- a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionDmiInEvent.json +++ /dev/null @@ -1,31 +0,0 @@ -{ - "data": { - "subscription": { - "clientID": "SCO-9989752", - "name": "cm-subscription-001" - }, - "dataType": { - "dataspace": "ALL", - "dataCategory": "CM", - "dataProvider": "CM-SERVICE" - }, - "predicates": { - "targets":[ - { - "id":"CMHandle2", - "additional-properties":{ - "Books":"Novel" - } - }, - { - "id":"CMHandle1", - "additional-properties":{ - "Books":"Social Media" - } - } - ], - "datastore": "passthrough-running", - "datastore-xpath-filter": "//_3gpp-nr-nrm-gnbdufunction:GNBDUFunction/_3gpp-nr-nrm-nrcelldu:NRCellDU/ | //_3gpp-nr-nrm-gnbcuupfunction:GNBCUUPFunction// | //_3gpp-nr-nrm-gnbcucpfunction:GNBCUCPFunction/_3gpp-nr-nrm-nrcelldu:NRCellCU// | //_3gpp-nr-nrm-nrsectorcarrier:NRSectorCarrier//" - } - } -}
\ No newline at end of file diff --git a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionDmiOutEvent.json b/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionDmiOutEvent.json deleted file mode 100644 index ae14b5ca21..0000000000 --- a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionDmiOutEvent.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "data": { - "clientId": "SCO-9989752", - "subscriptionName": "cm-subscription-001", - "dmiName": "dminame1", - "subscriptionStatus": [ - { - "id": "CMHandle1", - "status": "REJECTED", - "details": "Some error message from the DMI" - }, - { - "id": "CMHandle2", - "status": "REJECTED", - "details": "Some other error message from the DMI" - } - ] - } -}
\ No newline at end of file diff --git a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionEvent.json b/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionEvent.json deleted file mode 100644 index c38cb79211..0000000000 --- a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionEvent.json +++ /dev/null @@ -1,31 +0,0 @@ -{ - "clientId": "SCO-9989752", - "subscriptionName": "cm-subscription-001", - "cmSubscriptionStatus": [ - { - "id": "CMHandle1", - "status": "REJECTED", - "details": "Some error message from the DMI" - }, - { - "id": "CMHandle2", - "status": "REJECTED", - "details": "Some other error message from the DMI" - }, - { - "id": "CMHandle3", - "status": "PENDING", - "details": "Some error causes pending" - }, - { - "id": "CMHandle4", - "status": "PENDING", - "details": "Some other error happened" - }, - { - "id": "CMHandle5", - "status": "PENDING", - "details": "Some other error happened" - } - ] -}
\ No newline at end of file diff --git a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpInEvent.json b/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpInEvent.json deleted file mode 100644 index 803fa48bdf..0000000000 --- a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpInEvent.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "data": { - "subscription": { - "clientID": "SCO-9989752", - "name": "cm-subscription-001" - }, - "dataType": { - "dataspace": "ALL", - "dataCategory": "CM", - "dataProvider": "CM-SERVICE" - }, - "predicates": { - "targets": [ - "CMHandle1", - "CMHandle2", - "CMHandle3" - ], - "datastore": "ncmp-datastore:passthrough-running", - "datastore-xpath-filter": "//_3gpp-nr-nrm-gnbdufunction:GNBDUFunction/_3gpp-nr-nrm-nrcelldu:NRCellDU/ | //_3gpp-nr-nrm-gnbcuupfunction:GNBCUUPFunction// | //_3gpp-nr-nrm-gnbcucpfunction:GNBCUCPFunction/_3gpp-nr-nrm-nrcelldu:NRCellCU// | //_3gpp-nr-nrm-nrsectorcarrier:NRSectorCarrier//" - } - } -}
\ No newline at end of file diff --git a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpOutEvent.json b/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpOutEvent.json deleted file mode 100644 index 856f238c6e..0000000000 --- a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpOutEvent.json +++ /dev/null @@ -1,28 +0,0 @@ -{ - "data": { - "statusCode": 104, - "statusMessage": "partially applied subscription", - "additionalInfo": { - "rejected": [ - { - "details": "Some other error message from the DMI", - "targets": ["CMHandle2"] - }, - { - "details": "Some error message from the DMI", - "targets": ["CMHandle1"] - } - ], - "pending": [ - { - "details": "Some other error happened", - "targets": ["CMHandle4", "CMHandle5"] - }, - { - "details": "Some error causes pending", - "targets": ["CMHandle3"] - } - ] - } - } -}
\ No newline at end of file diff --git a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpOutEvent2.json b/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpOutEvent2.json deleted file mode 100644 index 35ff0241df..0000000000 --- a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpOutEvent2.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "data": { - "statusCode": 104, - "statusMessage": "partially applied subscription", - "additionalInfo": { - "rejected": [ - { - "details": "Cm handle does not exist", - "targets": ["CMHandle1"] - } - ], - "pending": [ - { - "details": "Subscription forwarded to dmi plugin", - "targets": ["CMHandle3"] - } - ] - } - } -}
\ No newline at end of file diff --git a/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java b/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java index 1cfe21d3a2..fd47793a7a 100644 --- a/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java +++ b/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java @@ -84,67 +84,6 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService private static final String REG_EX_FOR_OPTIONAL_LIST_INDEX = "(\\[@.+?])?)"; @Override - public void addChildDataNodes(final String dataspaceName, final String anchorName, - final String parentNodeXpath, final Collection<DataNode> dataNodes) { - final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName); - addChildrenDataNodes(anchorEntity, parentNodeXpath, dataNodes); - } - - @Override - public void addListElements(final String dataspaceName, final String anchorName, final String parentNodeXpath, - final Collection<DataNode> newListElements) { - final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName); - addChildrenDataNodes(anchorEntity, parentNodeXpath, newListElements); - } - - private void addNewChildDataNode(final AnchorEntity anchorEntity, final String parentNodeXpath, - final DataNode newChild) { - final FragmentEntity parentFragmentEntity = getFragmentEntity(anchorEntity, parentNodeXpath); - final FragmentEntity newChildAsFragmentEntity = convertToFragmentWithAllDescendants(anchorEntity, newChild); - newChildAsFragmentEntity.setParentId(parentFragmentEntity.getId()); - try { - fragmentRepository.save(newChildAsFragmentEntity); - } catch (final DataIntegrityViolationException dataIntegrityViolationException) { - throw AlreadyDefinedException.forDataNodes(Collections.singletonList(newChild.getXpath()), - anchorEntity.getName()); - } - } - - private void addChildrenDataNodes(final AnchorEntity anchorEntity, final String parentNodeXpath, - final Collection<DataNode> newChildren) { - final FragmentEntity parentFragmentEntity = getFragmentEntity(anchorEntity, parentNodeXpath); - final List<FragmentEntity> fragmentEntities = new ArrayList<>(newChildren.size()); - try { - for (final DataNode newChildAsDataNode : newChildren) { - final FragmentEntity newChildAsFragmentEntity = - convertToFragmentWithAllDescendants(anchorEntity, newChildAsDataNode); - newChildAsFragmentEntity.setParentId(parentFragmentEntity.getId()); - fragmentEntities.add(newChildAsFragmentEntity); - } - fragmentRepository.saveAll(fragmentEntities); - } catch (final DataIntegrityViolationException dataIntegrityViolationException) { - log.warn("Exception occurred : {} , While saving : {} children, retrying using individual save operations", - dataIntegrityViolationException, fragmentEntities.size()); - retrySavingEachChildIndividually(anchorEntity, parentNodeXpath, newChildren); - } - } - - private void retrySavingEachChildIndividually(final AnchorEntity anchorEntity, final String parentNodeXpath, - final Collection<DataNode> newChildren) { - final Collection<String> failedXpaths = new HashSet<>(); - for (final DataNode newChild : newChildren) { - try { - addNewChildDataNode(anchorEntity, parentNodeXpath, newChild); - } catch (final AlreadyDefinedException alreadyDefinedException) { - failedXpaths.add(newChild.getXpath()); - } - } - if (!failedXpaths.isEmpty()) { - throw AlreadyDefinedException.forDataNodes(failedXpaths, anchorEntity.getName()); - } - } - - @Override public void storeDataNodes(final String dataspaceName, final String anchorName, final Collection<DataNode> dataNodes) { final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName); @@ -157,7 +96,7 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService fragmentRepository.saveAll(fragmentEntities); } catch (final DataIntegrityViolationException exception) { log.warn("Exception occurred : {} , While saving : {} data nodes, Retrying saving data nodes individually", - exception, dataNodes.size()); + exception, dataNodes.size()); storeDataNodesIndividually(anchorEntity, dataNodes); } } @@ -197,79 +136,153 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService return parentFragment; } - private FragmentEntity toFragmentEntity(final AnchorEntity anchorEntity, final DataNode dataNode) { - return FragmentEntity.builder() - .anchor(anchorEntity) - .xpath(dataNode.getXpath()) - .attributes(jsonObjectMapper.asJsonString(dataNode.getLeaves())) - .build(); + @Override + public void addListElements(final String dataspaceName, final String anchorName, final String parentNodeXpath, + final Collection<DataNode> newListElements) { + final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName); + addChildrenDataNodes(anchorEntity, parentNodeXpath, newListElements); } @Override - @Timed(value = "cps.data.persistence.service.datanode.get", - description = "Time taken to get a data node") - public Collection<DataNode> getDataNodes(final String dataspaceName, final String anchorName, - final String xpath, - final FetchDescendantsOption fetchDescendantsOption) { - final String targetXpath = getNormalizedXpath(xpath); - final Collection<DataNode> dataNodes = getDataNodesForMultipleXpaths(dataspaceName, anchorName, - Collections.singletonList(targetXpath), fetchDescendantsOption); - if (dataNodes.isEmpty()) { - throw new DataNodeNotFoundException(dataspaceName, anchorName, xpath); + public void addChildDataNodes(final String dataspaceName, final String anchorName, + final String parentNodeXpath, final Collection<DataNode> dataNodes) { + final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName); + addChildrenDataNodes(anchorEntity, parentNodeXpath, dataNodes); + } + + private void addChildrenDataNodes(final AnchorEntity anchorEntity, final String parentNodeXpath, + final Collection<DataNode> newChildren) { + final FragmentEntity parentFragmentEntity = getFragmentEntity(anchorEntity, parentNodeXpath); + final List<FragmentEntity> fragmentEntities = new ArrayList<>(newChildren.size()); + try { + for (final DataNode newChildAsDataNode : newChildren) { + final FragmentEntity newChildAsFragmentEntity = + convertToFragmentWithAllDescendants(anchorEntity, newChildAsDataNode); + newChildAsFragmentEntity.setParentId(parentFragmentEntity.getId()); + fragmentEntities.add(newChildAsFragmentEntity); + } + fragmentRepository.saveAll(fragmentEntities); + } catch (final DataIntegrityViolationException dataIntegrityViolationException) { + log.warn("Exception occurred : {} , While saving : {} children, retrying using individual save operations", + dataIntegrityViolationException, fragmentEntities.size()); + retrySavingEachChildIndividually(anchorEntity, parentNodeXpath, newChildren); } - return dataNodes; } - @Override - @Timed(value = "cps.data.persistence.service.datanode.batch.get", - description = "Time taken to get data nodes") - public Collection<DataNode> getDataNodesForMultipleXpaths(final String dataspaceName, final String anchorName, - final Collection<String> xpaths, - final FetchDescendantsOption fetchDescendantsOption) { - final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName); - Collection<FragmentEntity> fragmentEntities = getFragmentEntities(anchorEntity, xpaths); - fragmentEntities = fragmentRepository.prefetchDescendantsOfFragmentEntities(fetchDescendantsOption, - fragmentEntities); - return createDataNodesFromFragmentEntities(fetchDescendantsOption, fragmentEntities); + private void addNewChildDataNode(final AnchorEntity anchorEntity, final String parentNodeXpath, + final DataNode newChild) { + final FragmentEntity parentFragmentEntity = getFragmentEntity(anchorEntity, parentNodeXpath); + final FragmentEntity newChildAsFragmentEntity = convertToFragmentWithAllDescendants(anchorEntity, newChild); + newChildAsFragmentEntity.setParentId(parentFragmentEntity.getId()); + try { + fragmentRepository.save(newChildAsFragmentEntity); + } catch (final DataIntegrityViolationException dataIntegrityViolationException) { + throw AlreadyDefinedException.forDataNodes(Collections.singletonList(newChild.getXpath()), + anchorEntity.getName()); + } } - private Collection<FragmentEntity> getFragmentEntities(final AnchorEntity anchorEntity, - final Collection<String> xpaths) { - final Collection<String> normalizedXpaths = getNormalizedXpaths(xpaths); + private void retrySavingEachChildIndividually(final AnchorEntity anchorEntity, final String parentNodeXpath, + final Collection<DataNode> newChildren) { + final Collection<String> failedXpaths = new HashSet<>(); + for (final DataNode newChild : newChildren) { + try { + addNewChildDataNode(anchorEntity, parentNodeXpath, newChild); + } catch (final AlreadyDefinedException alreadyDefinedException) { + failedXpaths.add(newChild.getXpath()); + } + } + if (!failedXpaths.isEmpty()) { + throw AlreadyDefinedException.forDataNodes(failedXpaths, anchorEntity.getName()); + } + } - final boolean haveRootXpath = normalizedXpaths.removeIf(CpsDataPersistenceServiceImpl::isRootXpath); + @Override + public void batchUpdateDataLeaves(final String dataspaceName, final String anchorName, + final Map<String, Map<String, Serializable>> updatedLeavesPerXPath) { + final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName); - final List<FragmentEntity> fragmentEntities = fragmentRepository.findByAnchorAndXpathIn(anchorEntity, - normalizedXpaths); + final Collection<String> xpathsOfUpdatedLeaves = updatedLeavesPerXPath.keySet(); + final Collection<FragmentEntity> fragmentEntities = getFragmentEntities(anchorEntity, xpathsOfUpdatedLeaves); for (final FragmentEntity fragmentEntity : fragmentEntities) { - normalizedXpaths.remove(fragmentEntity.getXpath()); + final Map<String, Serializable> updatedLeaves = updatedLeavesPerXPath.get(fragmentEntity.getXpath()); + final String mergedLeaves = mergeLeaves(updatedLeaves, fragmentEntity.getAttributes()); + fragmentEntity.setAttributes(mergedLeaves); } - for (final String xpath : normalizedXpaths) { - if (!CpsPathUtil.isPathToListElement(xpath)) { - fragmentEntities.addAll(fragmentRepository.findListByAnchorAndXpath(anchorEntity, xpath)); - } + try { + fragmentRepository.saveAll(fragmentEntities); + } catch (final StaleStateException staleStateException) { + retryUpdateDataNodesIndividually(anchorEntity, fragmentEntities); } + } - if (haveRootXpath) { - fragmentEntities.addAll(fragmentRepository.findRootsByAnchorId(anchorEntity.getId())); + @Override + public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName, + final Collection<DataNode> updatedDataNodes) { + final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName); + + final Map<String, DataNode> xpathToUpdatedDataNode = updatedDataNodes.stream() + .collect(Collectors.toMap(DataNode::getXpath, dataNode -> dataNode)); + + final Collection<String> xpaths = xpathToUpdatedDataNode.keySet(); + Collection<FragmentEntity> existingFragmentEntities = getFragmentEntities(anchorEntity, xpaths); + existingFragmentEntities = fragmentRepository.prefetchDescendantsOfFragmentEntities( + FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS, existingFragmentEntities); + + for (final FragmentEntity existingFragmentEntity : existingFragmentEntities) { + final DataNode updatedDataNode = xpathToUpdatedDataNode.get(existingFragmentEntity.getXpath()); + updateFragmentEntityAndDescendantsWithDataNode(existingFragmentEntity, updatedDataNode); } - return fragmentEntities; + try { + fragmentRepository.saveAll(existingFragmentEntities); + } catch (final StaleStateException staleStateException) { + retryUpdateDataNodesIndividually(anchorEntity, existingFragmentEntities); + } } - private FragmentEntity getFragmentEntity(final AnchorEntity anchorEntity, final String xpath) { - final FragmentEntity fragmentEntity; - if (isRootXpath(xpath)) { - fragmentEntity = fragmentRepository.findOneByAnchorId(anchorEntity.getId()).orElse(null); - } else { - fragmentEntity = fragmentRepository.getByAnchorAndXpath(anchorEntity, getNormalizedXpath(xpath)); + private void retryUpdateDataNodesIndividually(final AnchorEntity anchorEntity, + final Collection<FragmentEntity> fragmentEntities) { + final Collection<String> failedXpaths = new HashSet<>(); + for (final FragmentEntity dataNodeFragment : fragmentEntities) { + try { + fragmentRepository.save(dataNodeFragment); + } catch (final StaleStateException staleStateException) { + failedXpaths.add(dataNodeFragment.getXpath()); + } } - if (fragmentEntity == null) { - throw new DataNodeNotFoundException(anchorEntity.getDataspace().getName(), anchorEntity.getName(), xpath); + if (!failedXpaths.isEmpty()) { + final String failedXpathsConcatenated = String.join(",", failedXpaths); + throw new ConcurrencyException("Concurrent Transactions", String.format( + "DataNodes : %s in Dataspace :'%s' with Anchor : '%s' are updated by another transaction.", + failedXpathsConcatenated, anchorEntity.getDataspace().getName(), anchorEntity.getName())); } - return fragmentEntity; + } + + private void updateFragmentEntityAndDescendantsWithDataNode(final FragmentEntity existingFragmentEntity, + final DataNode newDataNode) { + copyAttributesFromNewDataNode(existingFragmentEntity, newDataNode); + + final Map<String, FragmentEntity> existingChildrenByXpath = existingFragmentEntity.getChildFragments().stream() + .collect(Collectors.toMap(FragmentEntity::getXpath, childFragmentEntity -> childFragmentEntity)); + + final Collection<FragmentEntity> updatedChildFragments = new HashSet<>(); + for (final DataNode newDataNodeChild : newDataNode.getChildDataNodes()) { + final FragmentEntity childFragment; + if (isNewDataNode(newDataNodeChild, existingChildrenByXpath)) { + childFragment = convertToFragmentWithAllDescendants(existingFragmentEntity.getAnchor(), + newDataNodeChild); + } else { + childFragment = existingChildrenByXpath.get(newDataNodeChild.getXpath()); + updateFragmentEntityAndDescendantsWithDataNode(childFragment, newDataNodeChild); + } + updatedChildFragments.add(childFragment); + } + + existingFragmentEntity.getChildFragments().clear(); + existingFragmentEntity.getChildFragments().addAll(updatedChildFragments); } @Override @@ -338,11 +351,6 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService return createDataNodesFromFragmentEntities(fetchDescendantsOption, fragmentEntities); } - private List<Long> getAnchorIdsForPagination(final DataspaceEntity dataspaceEntity, final CpsPathQuery cpsPathQuery, - final PaginationOption paginationOption) { - return fragmentRepository.findAnchorIdsForPagination(dataspaceEntity, cpsPathQuery, paginationOption); - } - private List<DataNode> createDataNodesFromFragmentEntities(final FetchDescendantsOption fetchDescendantsOption, final Collection<FragmentEntity> fragmentEntities) { final List<DataNode> dataNodes = new ArrayList<>(fragmentEntities.size()); @@ -352,29 +360,6 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService return Collections.unmodifiableList(dataNodes); } - private static String getNormalizedXpath(final String xpathSource) { - if (isRootXpath(xpathSource)) { - return xpathSource; - } - try { - return CpsPathUtil.getNormalizedXpath(xpathSource); - } catch (final PathParsingException pathParsingException) { - throw new CpsPathException(pathParsingException.getMessage()); - } - } - - private static Collection<String> getNormalizedXpaths(final Collection<String> xpaths) { - final Collection<String> normalizedXpaths = new HashSet<>(xpaths.size()); - for (final String xpath : xpaths) { - try { - normalizedXpaths.add(getNormalizedXpath(xpath)); - } catch (final CpsPathException cpsPathException) { - log.warn("Error parsing xpath \"{}\": {}", xpath, cpsPathException.getMessage()); - } - } - return normalizedXpaths; - } - @Override public String startSession() { return sessionManager.startSession(); @@ -404,21 +389,6 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService return anchorIdList.size(); } - private static Set<String> processAncestorXpath(final Collection<FragmentEntity> fragmentEntities, - final CpsPathQuery cpsPathQuery) { - final Set<String> ancestorXpath = new HashSet<>(); - final Pattern pattern = - Pattern.compile("(.*/" + Pattern.quote(cpsPathQuery.getAncestorSchemaNodeIdentifier()) - + REG_EX_FOR_OPTIONAL_LIST_INDEX + "/.*"); - for (final FragmentEntity fragmentEntity : fragmentEntities) { - final Matcher matcher = pattern.matcher(fragmentEntity.getXpath()); - if (matcher.matches()) { - ancestorXpath.add(matcher.group(1)); - } - } - return ancestorXpath; - } - private DataNode toDataNode(final FragmentEntity fragmentEntity, final FetchDescendantsOption fetchDescendantsOption) { final List<DataNode> childDataNodes = getChildDataNodes(fragmentEntity, fetchDescendantsOption); @@ -434,103 +404,15 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService .withChildDataNodes(childDataNodes).build(); } - private List<DataNode> getChildDataNodes(final FragmentEntity fragmentEntity, - final FetchDescendantsOption fetchDescendantsOption) { - if (fetchDescendantsOption.hasNext()) { - return fragmentEntity.getChildFragments().stream() - .map(childFragmentEntity -> toDataNode(childFragmentEntity, fetchDescendantsOption.next())) - .collect(Collectors.toList()); - } - return Collections.emptyList(); - } - - @Override - public void batchUpdateDataLeaves(final String dataspaceName, final String anchorName, - final Map<String, Map<String, Serializable>> updatedLeavesPerXPath) { - final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName); - - final Collection<String> xpathsOfUpdatedLeaves = updatedLeavesPerXPath.keySet(); - final Collection<FragmentEntity> fragmentEntities = getFragmentEntities(anchorEntity, xpathsOfUpdatedLeaves); - - for (final FragmentEntity fragmentEntity : fragmentEntities) { - final Map<String, Serializable> updatedLeaves = updatedLeavesPerXPath.get(fragmentEntity.getXpath()); - final String mergedLeaves = mergeLeaves(updatedLeaves, fragmentEntity.getAttributes()); - fragmentEntity.setAttributes(mergedLeaves); - } - - try { - fragmentRepository.saveAll(fragmentEntities); - } catch (final StaleStateException staleStateException) { - retryUpdateDataNodesIndividually(anchorEntity, fragmentEntities); - } - } - - @Override - public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName, - final Collection<DataNode> updatedDataNodes) { - final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName); - - final Map<String, DataNode> xpathToUpdatedDataNode = updatedDataNodes.stream() - .collect(Collectors.toMap(DataNode::getXpath, dataNode -> dataNode)); - - final Collection<String> xpaths = xpathToUpdatedDataNode.keySet(); - Collection<FragmentEntity> existingFragmentEntities = getFragmentEntities(anchorEntity, xpaths); - existingFragmentEntities = fragmentRepository.prefetchDescendantsOfFragmentEntities( - FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS, existingFragmentEntities); - - for (final FragmentEntity existingFragmentEntity : existingFragmentEntities) { - final DataNode updatedDataNode = xpathToUpdatedDataNode.get(existingFragmentEntity.getXpath()); - updateFragmentEntityAndDescendantsWithDataNode(existingFragmentEntity, updatedDataNode); - } - - try { - fragmentRepository.saveAll(existingFragmentEntities); - } catch (final StaleStateException staleStateException) { - retryUpdateDataNodesIndividually(anchorEntity, existingFragmentEntities); - } - } - - private void retryUpdateDataNodesIndividually(final AnchorEntity anchorEntity, - final Collection<FragmentEntity> fragmentEntities) { - final Collection<String> failedXpaths = new HashSet<>(); - for (final FragmentEntity dataNodeFragment : fragmentEntities) { - try { - fragmentRepository.save(dataNodeFragment); - } catch (final StaleStateException staleStateException) { - failedXpaths.add(dataNodeFragment.getXpath()); - } - } - if (!failedXpaths.isEmpty()) { - final String failedXpathsConcatenated = String.join(",", failedXpaths); - throw new ConcurrencyException("Concurrent Transactions", String.format( - "DataNodes : %s in Dataspace :'%s' with Anchor : '%s' are updated by another transaction.", - failedXpathsConcatenated, anchorEntity.getDataspace().getName(), anchorEntity.getName())); - } + private FragmentEntity toFragmentEntity(final AnchorEntity anchorEntity, final DataNode dataNode) { + return FragmentEntity.builder() + .anchor(anchorEntity) + .xpath(dataNode.getXpath()) + .attributes(jsonObjectMapper.asJsonString(dataNode.getLeaves())) + .build(); } - private void updateFragmentEntityAndDescendantsWithDataNode(final FragmentEntity existingFragmentEntity, - final DataNode newDataNode) { - copyAttributesFromNewDataNode(existingFragmentEntity, newDataNode); - - final Map<String, FragmentEntity> existingChildrenByXpath = existingFragmentEntity.getChildFragments().stream() - .collect(Collectors.toMap(FragmentEntity::getXpath, childFragmentEntity -> childFragmentEntity)); - - final Collection<FragmentEntity> updatedChildFragments = new HashSet<>(); - for (final DataNode newDataNodeChild : newDataNode.getChildDataNodes()) { - final FragmentEntity childFragment; - if (isNewDataNode(newDataNodeChild, existingChildrenByXpath)) { - childFragment = convertToFragmentWithAllDescendants(existingFragmentEntity.getAnchor(), - newDataNodeChild); - } else { - childFragment = existingChildrenByXpath.get(newDataNodeChild.getXpath()); - updateFragmentEntityAndDescendantsWithDataNode(childFragment, newDataNodeChild); - } - updatedChildFragments.add(childFragment); - } - existingFragmentEntity.getChildFragments().clear(); - existingFragmentEntity.getChildFragments().addAll(updatedChildFragments); - } @Override @Transactional @@ -636,6 +518,116 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService } } + @Override + @Timed(value = "cps.data.persistence.service.datanode.get", + description = "Time taken to get a data node") + public Collection<DataNode> getDataNodes(final String dataspaceName, final String anchorName, + final String xpath, + final FetchDescendantsOption fetchDescendantsOption) { + final String targetXpath = getNormalizedXpath(xpath); + final Collection<DataNode> dataNodes = getDataNodesForMultipleXpaths(dataspaceName, anchorName, + Collections.singletonList(targetXpath), fetchDescendantsOption); + if (dataNodes.isEmpty()) { + throw new DataNodeNotFoundException(dataspaceName, anchorName, xpath); + } + return dataNodes; + } + + @Override + @Timed(value = "cps.data.persistence.service.datanode.batch.get", + description = "Time taken to get data nodes") + public Collection<DataNode> getDataNodesForMultipleXpaths(final String dataspaceName, final String anchorName, + final Collection<String> xpaths, + final FetchDescendantsOption fetchDescendantsOption) { + final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName); + Collection<FragmentEntity> fragmentEntities = getFragmentEntities(anchorEntity, xpaths); + fragmentEntities = fragmentRepository.prefetchDescendantsOfFragmentEntities(fetchDescendantsOption, + fragmentEntities); + return createDataNodesFromFragmentEntities(fetchDescendantsOption, fragmentEntities); + } + + private List<DataNode> getChildDataNodes(final FragmentEntity fragmentEntity, + final FetchDescendantsOption fetchDescendantsOption) { + if (fetchDescendantsOption.hasNext()) { + return fragmentEntity.getChildFragments().stream() + .map(childFragmentEntity -> toDataNode(childFragmentEntity, fetchDescendantsOption.next())) + .collect(Collectors.toList()); + } + return Collections.emptyList(); + } + + private AnchorEntity getAnchorEntity(final String dataspaceName, final String anchorName) { + final DataspaceEntity dataspaceEntity = dataspaceRepository.getByName(dataspaceName); + return anchorRepository.getByDataspaceAndName(dataspaceEntity, anchorName); + } + + private List<Long> getAnchorIdsForPagination(final DataspaceEntity dataspaceEntity, final CpsPathQuery cpsPathQuery, + final PaginationOption paginationOption) { + return fragmentRepository.findAnchorIdsForPagination(dataspaceEntity, cpsPathQuery, paginationOption); + } + + private static String getNormalizedXpath(final String xpathSource) { + if (isRootXpath(xpathSource)) { + return xpathSource; + } + try { + return CpsPathUtil.getNormalizedXpath(xpathSource); + } catch (final PathParsingException pathParsingException) { + throw new CpsPathException(pathParsingException.getMessage()); + } + } + + private static Collection<String> getNormalizedXpaths(final Collection<String> xpaths) { + final Collection<String> normalizedXpaths = new HashSet<>(xpaths.size()); + for (final String xpath : xpaths) { + try { + normalizedXpaths.add(getNormalizedXpath(xpath)); + } catch (final CpsPathException cpsPathException) { + log.warn("Error parsing xpath \"{}\": {}", xpath, cpsPathException.getMessage()); + } + } + return normalizedXpaths; + } + + private FragmentEntity getFragmentEntity(final AnchorEntity anchorEntity, final String xpath) { + final FragmentEntity fragmentEntity; + if (isRootXpath(xpath)) { + fragmentEntity = fragmentRepository.findOneByAnchorId(anchorEntity.getId()).orElse(null); + } else { + fragmentEntity = fragmentRepository.getByAnchorAndXpath(anchorEntity, getNormalizedXpath(xpath)); + } + if (fragmentEntity == null) { + throw new DataNodeNotFoundException(anchorEntity.getDataspace().getName(), anchorEntity.getName(), xpath); + } + return fragmentEntity; + } + + private Collection<FragmentEntity> getFragmentEntities(final AnchorEntity anchorEntity, + final Collection<String> xpaths) { + final Collection<String> normalizedXpaths = getNormalizedXpaths(xpaths); + + final boolean haveRootXpath = normalizedXpaths.removeIf(CpsDataPersistenceServiceImpl::isRootXpath); + + final List<FragmentEntity> fragmentEntities = fragmentRepository.findByAnchorAndXpathIn(anchorEntity, + normalizedXpaths); + + for (final FragmentEntity fragmentEntity : fragmentEntities) { + normalizedXpaths.remove(fragmentEntity.getXpath()); + } + + for (final String xpath : normalizedXpaths) { + if (!CpsPathUtil.isPathToListElement(xpath)) { + fragmentEntities.addAll(fragmentRepository.findListByAnchorAndXpath(anchorEntity, xpath)); + } + } + + if (haveRootXpath) { + fragmentEntities.addAll(fragmentRepository.findRootsByAnchorId(anchorEntity.getId())); + } + + return fragmentEntities; + } + private static String getListElementXpathPrefix(final Collection<DataNode> newListElements) { if (newListElements.isEmpty()) { throw new CpsAdminException("Invalid list replacement", @@ -660,20 +652,6 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService return existingListElementEntity; } - private static boolean isNewDataNode(final DataNode replacementDataNode, - final Map<String, FragmentEntity> existingListElementsByXpath) { - return !existingListElementsByXpath.containsKey(replacementDataNode.getXpath()); - } - - private void copyAttributesFromNewDataNode(final FragmentEntity existingFragmentEntity, - final DataNode newDataNode) { - final String oldOrderedLeavesAsJson = getOrderedLeavesAsJson(existingFragmentEntity.getAttributes()); - final String newOrderedLeavesAsJson = getOrderedLeavesAsJson(newDataNode.getLeaves()); - if (!oldOrderedLeavesAsJson.equals(newOrderedLeavesAsJson)) { - existingFragmentEntity.setAttributes(jsonObjectMapper.asJsonString(newDataNode.getLeaves())); - } - } - private String getOrderedLeavesAsJson(final Map<String, Serializable> currentLeaves) { final Map<String, Serializable> sortedLeaves = new TreeMap<>(String::compareTo); sortedLeaves.putAll(currentLeaves); @@ -685,7 +663,7 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService return "{}"; } final Map<String, Serializable> sortedLeaves = jsonObjectMapper.convertJsonString(currentLeavesAsString, - TreeMap.class); + TreeMap.class); return jsonObjectMapper.asJsonString(sortedLeaves); } @@ -696,10 +674,39 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService .collect(Collectors.toMap(FragmentEntity::getXpath, fragmentEntity -> fragmentEntity)); } + private static Set<String> processAncestorXpath(final Collection<FragmentEntity> fragmentEntities, + final CpsPathQuery cpsPathQuery) { + final Set<String> ancestorXpath = new HashSet<>(); + final Pattern pattern = + Pattern.compile("(.*/" + Pattern.quote(cpsPathQuery.getAncestorSchemaNodeIdentifier()) + + REG_EX_FOR_OPTIONAL_LIST_INDEX + "/.*"); + for (final FragmentEntity fragmentEntity : fragmentEntities) { + final Matcher matcher = pattern.matcher(fragmentEntity.getXpath()); + if (matcher.matches()) { + ancestorXpath.add(matcher.group(1)); + } + } + return ancestorXpath; + } + private static boolean isRootXpath(final String xpath) { return "/".equals(xpath) || "".equals(xpath); } + private static boolean isNewDataNode(final DataNode replacementDataNode, + final Map<String, FragmentEntity> existingListElementsByXpath) { + return !existingListElementsByXpath.containsKey(replacementDataNode.getXpath()); + } + + private void copyAttributesFromNewDataNode(final FragmentEntity existingFragmentEntity, + final DataNode newDataNode) { + final String oldOrderedLeavesAsJson = getOrderedLeavesAsJson(existingFragmentEntity.getAttributes()); + final String newOrderedLeavesAsJson = getOrderedLeavesAsJson(newDataNode.getLeaves()); + if (!oldOrderedLeavesAsJson.equals(newOrderedLeavesAsJson)) { + existingFragmentEntity.setAttributes(jsonObjectMapper.asJsonString(newDataNode.getLeaves())); + } + } + private String mergeLeaves(final Map<String, Serializable> updateLeaves, final String currentLeavesAsString) { Map<String, Serializable> currentLeavesAsMap = new HashMap<>(); if (currentLeavesAsString != null) { @@ -712,9 +719,4 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService } return jsonObjectMapper.asJsonString(currentLeavesAsMap); } - - private AnchorEntity getAnchorEntity(final String dataspaceName, final String anchorName) { - final DataspaceEntity dataspaceEntity = dataspaceRepository.getByName(dataspaceName); - return anchorRepository.getByDataspaceAndName(dataspaceEntity, anchorName); - } } diff --git a/csit/tests/cps-data-operations/cps-data-operations.robot b/csit/tests/cps-data-operations/cps-data-operations.robot index 1f6611955e..17dce16446 100644 --- a/csit/tests/cps-data-operations/cps-data-operations.robot +++ b/csit/tests/cps-data-operations/cps-data-operations.robot @@ -46,18 +46,19 @@ NCMP Data Operation, forwarded to DMI, response on Client Topic ${params}= Create Dictionary topic=${topic} ${headers}= Create Dictionary Content-Type=application/json Authorization=${auth} POST On Session CPS_URL ncmpInventory/v1/ch headers=${headers} data=${newCmHandleRequestBody} - Sleep 8 wait some time to get updated the cm handle state to READY + ${getCmHandleUri}= Set Variable ${ncmpBasePath}/v1/ch/CMHandle1 + ${getCmHandleHeaders}= Create Dictionary Authorization=${auth} + Wait Until Keyword Succeeds 8sec 100ms Is CM Handle READY ${getCmHandleUri} ${getCmHandleHeaders} CMHandle1 ${response}= POST On Session CPS_URL ${uri} params=${params} headers=${headers} data=${dataOperationReqBody} Set Global Variable ${expectedRequestId} ${response.json()}[requestId] Should Be Equal As Strings ${response.status_code} 200 - Sleep 5 wait some time to get published a message to the client topic Consume cloud event from client topic ${group_id}= Create Consumer auto_offset_reset=earliest Subscribe Topic topics=${topic} group_id=${group_id} ${messages}= Poll group_id=${group_id} only_value=false - ${event} Set Variable ${messages}[0] - ${headers} Set Variable ${event.headers()} + ${event} Set Variable ${messages}[0] + ${headers} Set Variable ${event.headers()} FOR ${header_key_value_pair} IN @{headers} Compare Header Values ${header_key_value_pair[0]} ${header_key_value_pair[1]} "ce_specversion" "1.0" Compare Header Values ${header_key_value_pair[0]} ${header_key_value_pair[1]} "ce_type" "org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent" @@ -68,9 +69,19 @@ Consume cloud event from client topic *** Keywords *** Compare Header Values - [Arguments] ${header_key} ${header_value} ${header_to_check} ${expected_header_value} + [Arguments] ${header_key} ${header_value} ${header_to_check} ${expected_header_value} IF "${header_key}" == ${header_to_check} - Should Be Equal As Strings "${header_value}" ${expected_header_value} + Should Be Equal As Strings "${header_value}" ${expected_header_value} + END + +Is CM Handle READY + [Arguments] ${uri} ${headers} ${cmHandle} + ${response}= GET On Session CPS_URL ${uri} headers=${headers} + Should Be Equal As Strings ${response.status_code} 200 + FOR ${item} IN ${response.json()} + IF "${item['cmHandle']}" == "${cmHandle}" + Should Be Equal As Strings ${item['state']['cmHandleState']} READY + END END Basic Teardown diff --git a/csit/tests/cps-data-sync/cps-data-sync.robot b/csit/tests/cps-data-sync/cps-data-sync.robot index 71de4be1fe..c0ee4da67b 100644 --- a/csit/tests/cps-data-sync/cps-data-sync.robot +++ b/csit/tests/cps-data-sync/cps-data-sync.robot @@ -34,20 +34,40 @@ ${auth} Basic Y3BzdXNlcjpjcHNyMGNrcyE= ${ncmpBasePath} /ncmp *** Test Cases *** + +Check if ietfYang-PNFDemo is READY + ${uri}= Set Variable ${ncmpBasePath}/v1/ch/ietfYang-PNFDemo + ${headers}= Create Dictionary Authorization=${auth} + Wait Until Keyword Succeeds 10sec 100ms Is CM Handle READY ${uri} ${headers} ietfYang-PNFDemo + Operational state goes to UNSYNCHRONIZED when data sync (flag) is enabled ${uri}= Set Variable ${ncmpBasePath}/v1/ch/ietfYang-PNFDemo/data-sync ${params}= Create Dictionary dataSyncEnabled=true ${headers}= Create Dictionary Authorization=${auth} ${response}= PUT On Session CPS_URL ${uri} params=${params} headers=${headers} Should Be Equal As Strings ${response.status_code} 200 - ${verifyUri}= Set Variable ${ncmpBasePath}/v1/ch/ietfYang-PNFDemo/state - ${verifyHeaders}= Create Dictionary Authorization=${auth} - ${verifyResponse}= GET On Session CPS_URL ${verifyUri} headers=${verifyHeaders} - Should Be Equal As Strings ${verifyResponse.json()['state']['dataSyncState']['operational']['syncState']} UNSYNCHRONIZED - Sleep 5 + ${verifyUri}= Set Variable ${ncmpBasePath}/v1/ch/ietfYang-PNFDemo/state + ${verifyHeaders}= Create Dictionary Authorization=${auth} + ${verifyResponse}= GET On Session CPS_URL ${verifyUri} headers=${verifyHeaders} + Should Be Equal As Strings ${verifyResponse.json()['state']['dataSyncState']['operational']['syncState']} UNSYNCHRONIZED Operational state goes to SYNCHRONIZED after sometime when data sync (flag) is enabled - ${uri}= Set Variable ${ncmpBasePath}/v1/ch/ietfYang-PNFDemo/state - ${headers}= Create Dictionary Authorization=${auth} - ${response}= GET On Session CPS_URL ${uri} headers=${headers} - Should Be Equal As Strings ${response.json()['state']['dataSyncState']['operational']['syncState']} SYNCHRONIZED
\ No newline at end of file + ${uri}= Set Variable ${ncmpBasePath}/v1/ch/ietfYang-PNFDemo/state + ${headers}= Create Dictionary Authorization=${auth} + Wait Until Keyword Succeeds 10sec 100ms Is CM Handle State SYNCHRONIZED ${uri} ${headers} + +*** Keywords *** +Is CM Handle READY + [Arguments] ${uri} ${headers} ${cmHandle} + ${response}= GET On Session CPS_URL ${uri} headers=${headers} + Should Be Equal As Strings ${response.status_code} 200 + FOR ${item} IN ${response.json()} + IF "${item['cmHandle']}" == "${cmHandle}" + Should Be Equal As Strings ${item['state']['cmHandleState']} READY + END + END + +Is CM Handle State SYNCHRONIZED + [Arguments] ${uri} ${headers} + ${response}= GET On Session CPS_URL ${uri} headers=${headers} + Should Be Equal As Strings ${response.json()['state']['dataSyncState']['operational']['syncState']} SYNCHRONIZED diff --git a/csit/tests/cps-model-sync/cps-model-sync.robot b/csit/tests/cps-model-sync/cps-model-sync.robot index 704d02c4ba..3e8551f7f5 100644 --- a/csit/tests/cps-model-sync/cps-model-sync.robot +++ b/csit/tests/cps-model-sync/cps-model-sync.robot @@ -85,5 +85,4 @@ Get modules for registered data node IF "${item['moduleName']}" == "stores" Should Be Equal As Strings "${item['revision']}" "2020-09-15" END - END - Sleep 10
\ No newline at end of file + END
\ No newline at end of file diff --git a/csit/tests/cps-trust-level/cps-trust-level.robot b/csit/tests/cps-trust-level/cps-trust-level.robot index 70659000cf..e4deeff32b 100644 --- a/csit/tests/cps-trust-level/cps-trust-level.robot +++ b/csit/tests/cps-trust-level/cps-trust-level.robot @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation + * Copyright (C) 2023-2024 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,14 +44,13 @@ Register data node ${headers}= Create Dictionary Content-Type=application/json Authorization=${auth} ${response}= POST On Session CPS_URL ${uri} headers=${headers} data=${jsonCreateCmHandles} Should Be Equal As Strings ${response.status_code} 200 - Sleep 5 Verify notification ${group_id}= Create Consumer auto_offset_reset=earliest - Subscribe Topic topics=cm-events group_id=${group_id} - ${result}= Poll group_id=${group_id} only_value=False poll_attempts=5 - ${headers} Set Variable ${result[0].headers()} - ${payload} Set Variable ${result[0].value()} + Subscribe Topic topics=cm-events group_id=${group_id} + ${result}= Poll group_id=${group_id} only_value=False poll_attempts=5 + ${headers} Set Variable ${result[0].headers()} + ${payload} Set Variable ${result[0].value()} FOR ${header_key_value_pair} IN @{headers} Compare Header Values ${header_key_value_pair[0]} ${header_key_value_pair[1]} "ce_specversion" "1.0" Compare Header Values ${header_key_value_pair[0]} ${header_key_value_pair[1]} "ce_source" "NCMP" |