summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy28
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumer.java64
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java10
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/inventory/CompositeStateUtils.java55
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/CmDataSubscriptionModelLoader.java11
-rw-r--r--cps-ncmp-service/src/main/resources/models/subscription.yang57
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumerSpec.groovy93
-rw-r--r--cps-ncmp-service/src/test/resources/cmSubscription/cmSubscriptionNcmpInEvent.json23
-rw-r--r--cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionDmiInEvent.json31
-rw-r--r--cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionDmiOutEvent.json19
-rw-r--r--cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionEvent.json31
-rw-r--r--cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpInEvent.json22
-rw-r--r--cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpOutEvent.json28
-rw-r--r--cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpOutEvent2.json20
-rw-r--r--cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java542
-rw-r--r--csit/tests/cps-data-operations/cps-data-operations.robot23
-rw-r--r--csit/tests/cps-data-sync/cps-data-sync.robot38
-rw-r--r--csit/tests/cps-model-sync/cps-model-sync.robot3
-rw-r--r--csit/tests/cps-trust-level/cps-trust-level.robot11
19 files changed, 548 insertions, 561 deletions
diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy
index ae7c56458d..f06af6c10a 100644
--- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy
+++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
+ * Copyright (C) 2023-2024 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -37,19 +37,27 @@ class NcmpDatastoreRequestHandlerSpec extends Specification {
def objectUnderTest = new NcmpPassthroughResourceRequestHandler(spiedCpsNcmpTaskExecutor, mockNetworkCmProxyDataService)
+ def setup() {
+ objectUnderTest.timeOutInMilliSeconds = 100
+ }
+
def 'Attempt to execute async get request with #scenario.'() {
given: 'notification feature is turned on/off'
objectUnderTest.notificationFeatureEnabled = notificationFeatureEnabled
+ and: ' a flag to track the network service call'
+ def networkServiceMethodCalled = false
+ and: 'the (mocked) service will use the flag to indicate if it is called'
+ mockNetworkCmProxyDataService.getResourceDataForCmHandle('ds', 'ch1', 'resource1', 'options', _, _) >> {
+ networkServiceMethodCalled = true
+ }
when: 'get request is executed with topic = #topic'
objectUnderTest.executeRequest('ds', 'ch1', 'resource1', 'options', topic, false)
- and: 'wait a little for async execution (only if expected)'
- if (expectedCalls > 0) {
- Thread.sleep(500)
- }
then: 'the task is executed in an async fashion or not'
expectedCalls * spiedCpsNcmpTaskExecutor.executeTask(*_)
- /*and: 'the service request is always invoked'
- 1 * mockNetworkCmProxyDataService.getResourceDataForCmHandle('ds', 'ch1', 'resource1', 'options', _, _)*/
+ and: 'the service request is always invoked within 1 seconds'
+ new PollingConditions().within(1) {
+ assert networkServiceMethodCalled == true
+ }
where: 'the following parameters are used'
scenario | notificationFeatureEnabled | topic || expectedCalls
'feature on, valid topic' | true | 'valid' || 1
@@ -89,9 +97,9 @@ class NcmpDatastoreRequestHandlerSpec extends Specification {
objectUnderTest.executeRequest('myTopic', dataOperationRequest)
then: 'the task is executed in an async fashion'
1 * spiedCpsNcmpTaskExecutor.executeTask(*_)
- and: 'the network service is invoked (wait max. 5 seconds)'
- new PollingConditions(timeout: 30).eventually {
- //TODO Fix test assertion
+ and: 'the network service is invoked within 1 seconds'
+ new PollingConditions().within(1) {
+ assert networkServiceMethodCalled == true
}
where: 'the following datastores are used'
datastore << ['ncmp-datastore:passthrough-running', 'ncmp-datastore:passthrough-operational']
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumer.java
new file mode 100644
index 0000000000..8bc36943a3
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumer.java
@@ -0,0 +1,64 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.events.cmsubscription;
+
+import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent;
+
+import io.cloudevents.CloudEvent;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class CmSubscriptionNcmpInEventConsumer {
+
+ @Value("${notification.enabled:true}")
+ private boolean notificationFeatureEnabled;
+
+ @Value("${ncmp.model-loader.subscription:false}")
+ private boolean subscriptionModelLoaderEnabled;
+
+ /**
+ * Consume the specified event.
+ *
+ * @param subscriptionEventConsumerRecord the event to be consumed
+ */
+ @KafkaListener(topics = "${app.ncmp.avc.subscription-topic}",
+ containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
+ public void consumeSubscriptionEvent(final ConsumerRecord<String, CloudEvent> subscriptionEventConsumerRecord) {
+ final CloudEvent cloudEvent = subscriptionEventConsumerRecord.value();
+ final CmSubscriptionNcmpInEvent cmSubscriptionNcmpInEvent =
+ toTargetEvent(cloudEvent, CmSubscriptionNcmpInEvent.class);
+ if (subscriptionModelLoaderEnabled) {
+ log.info("Subscription with name {} to be mapped to hazelcast object...",
+ cmSubscriptionNcmpInEvent.getData().getSubscriptionId());
+ }
+ if ("subscriptionCreated".equals(cloudEvent.getType()) && cmSubscriptionNcmpInEvent != null) {
+ log.info("Subscription for ClientID {} with name {} ...",
+ cloudEvent.getSource(),
+ cmSubscriptionNcmpInEvent.getData().getSubscriptionId());
+ }
+ }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java
index a31332f094..0ed95adff2 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022-2023 Nordix Foundation
+ * Copyright (C) 2022-2024 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -164,12 +164,12 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState
}
private void setInitialStates(final YangModelCmHandle yangModelCmHandle) {
- CompositeStateUtils.setInitialDataStoreSyncState().accept(yangModelCmHandle.getCompositeState());
- CompositeStateUtils.setCompositeState(READY).accept(yangModelCmHandle.getCompositeState());
+ CompositeStateUtils.setInitialDataStoreSyncState(yangModelCmHandle.getCompositeState());
+ CompositeStateUtils.setCompositeState(READY, yangModelCmHandle.getCompositeState());
}
private void retryCmHandle(final YangModelCmHandle yangModelCmHandle) {
- CompositeStateUtils.setCompositeStateForRetry().accept(yangModelCmHandle.getCompositeState());
+ CompositeStateUtils.setCompositeStateForRetry(yangModelCmHandle.getCompositeState());
}
private void registerNewCmHandle(final YangModelCmHandle yangModelCmHandle) {
@@ -178,7 +178,7 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState
}
private void setCmHandleState(final YangModelCmHandle yangModelCmHandle, final CmHandleState targetCmHandleState) {
- CompositeStateUtils.setCompositeState(targetCmHandleState).accept(yangModelCmHandle.getCompositeState());
+ CompositeStateUtils.setCompositeState(targetCmHandleState, yangModelCmHandle.getCompositeState());
}
private boolean isNew(final CompositeState existingCompositeState) {
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/inventory/CompositeStateUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/inventory/CompositeStateUtils.java
index 99cca8c0b3..35ad54fdef 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/inventory/CompositeStateUtils.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/inventory/CompositeStateUtils.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022-2023 Nordix Foundation
+ * Copyright (C) 2022-2024 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,7 +20,6 @@
package org.onap.cps.ncmp.api.impl.inventory;
-import java.util.function.Consumer;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -34,31 +33,23 @@ public class CompositeStateUtils {
/**
* Sets the cmHandleState to the provided state and updates the timestamp.
- *
- * @return Updated CompositeState
*/
- public static Consumer<CompositeState> setCompositeState(final CmHandleState cmHandleState) {
- return compositeState -> {
- compositeState.setCmHandleState(cmHandleState);
- compositeState.setLastUpdateTimeNow();
- };
+ public static void setCompositeState(final CmHandleState cmHandleState,
+ final CompositeState compositeState) {
+ compositeState.setCmHandleState(cmHandleState);
+ compositeState.setLastUpdateTimeNow();
}
/**
* Set the Operational datastore sync state based on the global flag.
- *
- * @return Updated CompositeState
*/
- public static Consumer<CompositeState> setInitialDataStoreSyncState() {
-
- return compositeState -> {
- compositeState.setDataSyncEnabled(false);
- final CompositeState.Operational operational =
- getInitialDataStoreSyncState(compositeState.getDataSyncEnabled());
- final CompositeState.DataStores dataStores =
- CompositeState.DataStores.builder().operationalDataStore(operational).build();
- compositeState.setDataStores(dataStores);
- };
+ public static void setInitialDataStoreSyncState(final CompositeState compositeState) {
+ compositeState.setDataSyncEnabled(false);
+ final CompositeState.Operational operational =
+ getInitialDataStoreSyncState(compositeState.getDataSyncEnabled());
+ final CompositeState.DataStores dataStores =
+ CompositeState.DataStores.builder().operationalDataStore(operational).build();
+ compositeState.setDataStores(dataStores);
}
/**
@@ -91,19 +82,15 @@ public class CompositeStateUtils {
/**
* Sets the cmHandleState to ADVISED and retain the lock details. Used in retry scenarios.
- *
- * @return Updated CompositeState
*/
- public static Consumer<CompositeState> setCompositeStateForRetry() {
- return compositeState -> {
- compositeState.setCmHandleState(CmHandleState.ADVISED);
- compositeState.setLastUpdateTimeNow();
- final String oldLockReasonDetails = compositeState.getLockReason().getDetails();
- final CompositeState.LockReason lockReason =
- CompositeState.LockReason.builder()
- .lockReasonCategory(compositeState.getLockReason().getLockReasonCategory())
- .details(oldLockReasonDetails).build();
- compositeState.setLockReason(lockReason);
- };
+ public static void setCompositeStateForRetry(final CompositeState compositeState) {
+ compositeState.setCmHandleState(CmHandleState.ADVISED);
+ compositeState.setLastUpdateTimeNow();
+ final String oldLockReasonDetails = compositeState.getLockReason().getDetails();
+ final CompositeState.LockReason lockReason =
+ CompositeState.LockReason.builder()
+ .lockReasonCategory(compositeState.getLockReason().getLockReasonCategory())
+ .details(oldLockReasonDetails).build();
+ compositeState.setLockReason(lockReason);
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/CmDataSubscriptionModelLoader.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/CmDataSubscriptionModelLoader.java
index 81055db847..88ba5e91be 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/CmDataSubscriptionModelLoader.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/CmDataSubscriptionModelLoader.java
@@ -45,13 +45,6 @@ public class CmDataSubscriptionModelLoader extends AbstractModelLoader {
private static final String DATASTORE_PASSTHROUGH_OPERATIONAL = "ncmp-datastores:passthrough-operational";
private static final String DATASTORE_PASSTHROUGH_RUNNING = "ncmp-datastores:passthrough-running";
- private static final String DEPRECATED_MODEL_FILENAME = "subscription.yang";
- private static final String DEPRECATED_ANCHOR_NAME = "AVC-Subscriptions";
- private static final String DEPRECATED_SCHEMASET_NAME = "subscriptions";
- private static final String DEPRECATED_REGISTRY_DATANODE_NAME = "subscription-registry";
-
-
-
public CmDataSubscriptionModelLoader(final CpsDataspaceService cpsDataspaceService,
final CpsModuleService cpsModuleService,
final CpsAnchorService cpsAnchorService,
@@ -74,10 +67,6 @@ public class CmDataSubscriptionModelLoader extends AbstractModelLoader {
}
private void onboardSubscriptionModels() {
- createSchemaSet(NCMP_DATASPACE_NAME, DEPRECATED_SCHEMASET_NAME, DEPRECATED_MODEL_FILENAME);
- createAnchor(NCMP_DATASPACE_NAME, DEPRECATED_SCHEMASET_NAME, DEPRECATED_ANCHOR_NAME);
- createTopLevelDataNode(NCMP_DATASPACE_NAME, DEPRECATED_ANCHOR_NAME, DEPRECATED_REGISTRY_DATANODE_NAME);
-
createSchemaSet(NCMP_DATASPACE_NAME, SCHEMASET_NAME, MODEL_FILENAME);
createAnchor(NCMP_DATASPACE_NAME, SCHEMASET_NAME, ANCHOR_NAME);
createTopLevelDataNode(NCMP_DATASPACE_NAME, ANCHOR_NAME, REGISTRY_DATANODE_NAME);
diff --git a/cps-ncmp-service/src/main/resources/models/subscription.yang b/cps-ncmp-service/src/main/resources/models/subscription.yang
deleted file mode 100644
index 7096c18abc..0000000000
--- a/cps-ncmp-service/src/main/resources/models/subscription.yang
+++ /dev/null
@@ -1,57 +0,0 @@
-module subscription {
- yang-version 1.1;
- namespace "org:onap:ncmp:subscription";
-
- prefix subs;
-
- revision "2023-03-21" {
- description
- "NCMP subscription model";
- }
-
- container subscription-registry {
- list subscription {
- key "clientID subscriptionName";
-
- leaf clientID {
- type string;
- }
-
- leaf subscriptionName {
- type string;
- }
-
- leaf topic {
- type string;
- }
-
- leaf isTagged {
- type boolean;
- }
-
- container predicates {
-
- list targetCmHandles {
- key "cmHandleId";
-
- leaf cmHandleId {
- type string;
- }
-
- leaf status {
- type string;
- }
-
- leaf details {
- type string;
- }
- }
-
- leaf datastore {
- type string;
- }
- }
-
- }
- }
-} \ No newline at end of file
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumerSpec.groovy
new file mode 100644
index 0000000000..57e77eba31
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumerSpec.groovy
@@ -0,0 +1,93 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an 'AS IS' BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.events.cmsubscription
+
+import ch.qos.logback.classic.Level
+import ch.qos.logback.classic.Logger
+import ch.qos.logback.classic.spi.ILoggingEvent
+import ch.qos.logback.core.read.ListAppender
+import com.fasterxml.jackson.databind.ObjectMapper
+import io.cloudevents.CloudEvent
+import io.cloudevents.core.builder.CloudEventBuilder
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.BeforeEach
+import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
+import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent
+import org.onap.cps.ncmp.utils.TestUtils
+import org.onap.cps.utils.JsonObjectMapper
+import org.slf4j.LoggerFactory
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+
+@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
+class CmSubscriptionNcmpInEventConsumerSpec extends MessagingBaseSpec {
+
+ def objectUnderTest = new CmSubscriptionNcmpInEventConsumer()
+ def logger = Spy(ListAppender<ILoggingEvent>)
+
+ @Autowired
+ JsonObjectMapper jsonObjectMapper
+
+ @Autowired
+ ObjectMapper objectMapper
+
+ @BeforeEach
+ void setup() {
+ ((Logger) LoggerFactory.getLogger(CmSubscriptionNcmpInEventConsumer.class)).addAppender(logger);
+ logger.start();
+ }
+
+ @AfterEach
+ void teardown() {
+ ((Logger) LoggerFactory.getLogger(CmSubscriptionNcmpInEventConsumer.class)).detachAndStopAllAppenders();
+ }
+
+
+ def 'Consume valid CMSubscription create message'() {
+ given: 'a cmsubscription event'
+ def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmSubscriptionNcmpInEvent.json')
+ def testEventSent = jsonObjectMapper.convertJsonString(jsonData, CmSubscriptionNcmpInEvent.class)
+ def testCloudEventSent = CloudEventBuilder.v1()
+ .withData(objectMapper.writeValueAsBytes(testEventSent))
+ .withId('subscriptionCreated')
+ .withType('subscriptionCreated')
+ .withSource(URI.create('some-resource'))
+ .withExtension('correlationid', 'test-cmhandle1').build()
+ def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
+ and: 'notifications are enabled'
+ objectUnderTest.notificationFeatureEnabled = true
+ and: 'subscription model loader is enabled'
+ objectUnderTest.subscriptionModelLoaderEnabled = true
+ when: 'the valid event is consumed'
+ objectUnderTest.consumeSubscriptionEvent(consumerRecord)
+ then: 'an event is logged with level INFO'
+ def loggingEvent = getLoggingEvent()
+ assert loggingEvent.level == Level.INFO
+ and: 'the log indicates the task completed successfully'
+ assert loggingEvent.formattedMessage == 'Subscription with name cm-subscription-001 to be mapped to hazelcast object...'
+ }
+
+ def getLoggingEvent() {
+ return logger.list[0]
+ }
+
+}
diff --git a/cps-ncmp-service/src/test/resources/cmSubscription/cmSubscriptionNcmpInEvent.json b/cps-ncmp-service/src/test/resources/cmSubscription/cmSubscriptionNcmpInEvent.json
new file mode 100644
index 0000000000..5246618820
--- /dev/null
+++ b/cps-ncmp-service/src/test/resources/cmSubscription/cmSubscriptionNcmpInEvent.json
@@ -0,0 +1,23 @@
+{
+ "data": {
+ "subscriptionId": "cm-subscription-001",
+ "predicates": [
+ {
+ "targetFilter": [
+ "CMHandle1",
+ "CMHandle2",
+ "CMHandle3"
+ ],
+ "scopeFilter": {
+ "datastore": "ncmp-datastore:passthrough-running",
+ "xpath-filter": [
+ "//_3gpp-nr-nrm-gnbdufunction:GNBDUFunction/_3gpp-nr-nrm-nrcelldu:NRCellDU/",
+ "//_3gpp-nr-nrm-gnbcuupfunction:GNBCUUPFunction//",
+ "//_3gpp-nr-nrm-gnbcucpfunction:GNBCUCPFunction/_3gpp-nr-nrm-nrcelldu:NRCellCU//",
+ "//_3gpp-nr-nrm-nrsectorcarrier:NRSectorCarrier//"
+ ]
+ }
+ }
+ ]
+ }
+} \ No newline at end of file
diff --git a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionDmiInEvent.json b/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionDmiInEvent.json
deleted file mode 100644
index f31362a1c6..0000000000
--- a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionDmiInEvent.json
+++ /dev/null
@@ -1,31 +0,0 @@
-{
- "data": {
- "subscription": {
- "clientID": "SCO-9989752",
- "name": "cm-subscription-001"
- },
- "dataType": {
- "dataspace": "ALL",
- "dataCategory": "CM",
- "dataProvider": "CM-SERVICE"
- },
- "predicates": {
- "targets":[
- {
- "id":"CMHandle2",
- "additional-properties":{
- "Books":"Novel"
- }
- },
- {
- "id":"CMHandle1",
- "additional-properties":{
- "Books":"Social Media"
- }
- }
- ],
- "datastore": "passthrough-running",
- "datastore-xpath-filter": "//_3gpp-nr-nrm-gnbdufunction:GNBDUFunction/_3gpp-nr-nrm-nrcelldu:NRCellDU/ | //_3gpp-nr-nrm-gnbcuupfunction:GNBCUUPFunction// | //_3gpp-nr-nrm-gnbcucpfunction:GNBCUCPFunction/_3gpp-nr-nrm-nrcelldu:NRCellCU// | //_3gpp-nr-nrm-nrsectorcarrier:NRSectorCarrier//"
- }
- }
-} \ No newline at end of file
diff --git a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionDmiOutEvent.json b/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionDmiOutEvent.json
deleted file mode 100644
index ae14b5ca21..0000000000
--- a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionDmiOutEvent.json
+++ /dev/null
@@ -1,19 +0,0 @@
-{
- "data": {
- "clientId": "SCO-9989752",
- "subscriptionName": "cm-subscription-001",
- "dmiName": "dminame1",
- "subscriptionStatus": [
- {
- "id": "CMHandle1",
- "status": "REJECTED",
- "details": "Some error message from the DMI"
- },
- {
- "id": "CMHandle2",
- "status": "REJECTED",
- "details": "Some other error message from the DMI"
- }
- ]
- }
-} \ No newline at end of file
diff --git a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionEvent.json b/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionEvent.json
deleted file mode 100644
index c38cb79211..0000000000
--- a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionEvent.json
+++ /dev/null
@@ -1,31 +0,0 @@
-{
- "clientId": "SCO-9989752",
- "subscriptionName": "cm-subscription-001",
- "cmSubscriptionStatus": [
- {
- "id": "CMHandle1",
- "status": "REJECTED",
- "details": "Some error message from the DMI"
- },
- {
- "id": "CMHandle2",
- "status": "REJECTED",
- "details": "Some other error message from the DMI"
- },
- {
- "id": "CMHandle3",
- "status": "PENDING",
- "details": "Some error causes pending"
- },
- {
- "id": "CMHandle4",
- "status": "PENDING",
- "details": "Some other error happened"
- },
- {
- "id": "CMHandle5",
- "status": "PENDING",
- "details": "Some other error happened"
- }
- ]
-} \ No newline at end of file
diff --git a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpInEvent.json b/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpInEvent.json
deleted file mode 100644
index 803fa48bdf..0000000000
--- a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpInEvent.json
+++ /dev/null
@@ -1,22 +0,0 @@
-{
- "data": {
- "subscription": {
- "clientID": "SCO-9989752",
- "name": "cm-subscription-001"
- },
- "dataType": {
- "dataspace": "ALL",
- "dataCategory": "CM",
- "dataProvider": "CM-SERVICE"
- },
- "predicates": {
- "targets": [
- "CMHandle1",
- "CMHandle2",
- "CMHandle3"
- ],
- "datastore": "ncmp-datastore:passthrough-running",
- "datastore-xpath-filter": "//_3gpp-nr-nrm-gnbdufunction:GNBDUFunction/_3gpp-nr-nrm-nrcelldu:NRCellDU/ | //_3gpp-nr-nrm-gnbcuupfunction:GNBCUUPFunction// | //_3gpp-nr-nrm-gnbcucpfunction:GNBCUCPFunction/_3gpp-nr-nrm-nrcelldu:NRCellCU// | //_3gpp-nr-nrm-nrsectorcarrier:NRSectorCarrier//"
- }
- }
-} \ No newline at end of file
diff --git a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpOutEvent.json b/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpOutEvent.json
deleted file mode 100644
index 856f238c6e..0000000000
--- a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpOutEvent.json
+++ /dev/null
@@ -1,28 +0,0 @@
-{
- "data": {
- "statusCode": 104,
- "statusMessage": "partially applied subscription",
- "additionalInfo": {
- "rejected": [
- {
- "details": "Some other error message from the DMI",
- "targets": ["CMHandle2"]
- },
- {
- "details": "Some error message from the DMI",
- "targets": ["CMHandle1"]
- }
- ],
- "pending": [
- {
- "details": "Some other error happened",
- "targets": ["CMHandle4", "CMHandle5"]
- },
- {
- "details": "Some error causes pending",
- "targets": ["CMHandle3"]
- }
- ]
- }
- }
-} \ No newline at end of file
diff --git a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpOutEvent2.json b/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpOutEvent2.json
deleted file mode 100644
index 35ff0241df..0000000000
--- a/cps-ncmp-service/src/test/resources/deprecatedCmSubscription/cmSubscriptionNcmpOutEvent2.json
+++ /dev/null
@@ -1,20 +0,0 @@
-{
- "data": {
- "statusCode": 104,
- "statusMessage": "partially applied subscription",
- "additionalInfo": {
- "rejected": [
- {
- "details": "Cm handle does not exist",
- "targets": ["CMHandle1"]
- }
- ],
- "pending": [
- {
- "details": "Subscription forwarded to dmi plugin",
- "targets": ["CMHandle3"]
- }
- ]
- }
- }
-} \ No newline at end of file
diff --git a/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java b/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java
index 1cfe21d3a2..fd47793a7a 100644
--- a/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java
+++ b/cps-ri/src/main/java/org/onap/cps/spi/impl/CpsDataPersistenceServiceImpl.java
@@ -84,67 +84,6 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
private static final String REG_EX_FOR_OPTIONAL_LIST_INDEX = "(\\[@.+?])?)";
@Override
- public void addChildDataNodes(final String dataspaceName, final String anchorName,
- final String parentNodeXpath, final Collection<DataNode> dataNodes) {
- final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName);
- addChildrenDataNodes(anchorEntity, parentNodeXpath, dataNodes);
- }
-
- @Override
- public void addListElements(final String dataspaceName, final String anchorName, final String parentNodeXpath,
- final Collection<DataNode> newListElements) {
- final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName);
- addChildrenDataNodes(anchorEntity, parentNodeXpath, newListElements);
- }
-
- private void addNewChildDataNode(final AnchorEntity anchorEntity, final String parentNodeXpath,
- final DataNode newChild) {
- final FragmentEntity parentFragmentEntity = getFragmentEntity(anchorEntity, parentNodeXpath);
- final FragmentEntity newChildAsFragmentEntity = convertToFragmentWithAllDescendants(anchorEntity, newChild);
- newChildAsFragmentEntity.setParentId(parentFragmentEntity.getId());
- try {
- fragmentRepository.save(newChildAsFragmentEntity);
- } catch (final DataIntegrityViolationException dataIntegrityViolationException) {
- throw AlreadyDefinedException.forDataNodes(Collections.singletonList(newChild.getXpath()),
- anchorEntity.getName());
- }
- }
-
- private void addChildrenDataNodes(final AnchorEntity anchorEntity, final String parentNodeXpath,
- final Collection<DataNode> newChildren) {
- final FragmentEntity parentFragmentEntity = getFragmentEntity(anchorEntity, parentNodeXpath);
- final List<FragmentEntity> fragmentEntities = new ArrayList<>(newChildren.size());
- try {
- for (final DataNode newChildAsDataNode : newChildren) {
- final FragmentEntity newChildAsFragmentEntity =
- convertToFragmentWithAllDescendants(anchorEntity, newChildAsDataNode);
- newChildAsFragmentEntity.setParentId(parentFragmentEntity.getId());
- fragmentEntities.add(newChildAsFragmentEntity);
- }
- fragmentRepository.saveAll(fragmentEntities);
- } catch (final DataIntegrityViolationException dataIntegrityViolationException) {
- log.warn("Exception occurred : {} , While saving : {} children, retrying using individual save operations",
- dataIntegrityViolationException, fragmentEntities.size());
- retrySavingEachChildIndividually(anchorEntity, parentNodeXpath, newChildren);
- }
- }
-
- private void retrySavingEachChildIndividually(final AnchorEntity anchorEntity, final String parentNodeXpath,
- final Collection<DataNode> newChildren) {
- final Collection<String> failedXpaths = new HashSet<>();
- for (final DataNode newChild : newChildren) {
- try {
- addNewChildDataNode(anchorEntity, parentNodeXpath, newChild);
- } catch (final AlreadyDefinedException alreadyDefinedException) {
- failedXpaths.add(newChild.getXpath());
- }
- }
- if (!failedXpaths.isEmpty()) {
- throw AlreadyDefinedException.forDataNodes(failedXpaths, anchorEntity.getName());
- }
- }
-
- @Override
public void storeDataNodes(final String dataspaceName, final String anchorName,
final Collection<DataNode> dataNodes) {
final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName);
@@ -157,7 +96,7 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
fragmentRepository.saveAll(fragmentEntities);
} catch (final DataIntegrityViolationException exception) {
log.warn("Exception occurred : {} , While saving : {} data nodes, Retrying saving data nodes individually",
- exception, dataNodes.size());
+ exception, dataNodes.size());
storeDataNodesIndividually(anchorEntity, dataNodes);
}
}
@@ -197,79 +136,153 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
return parentFragment;
}
- private FragmentEntity toFragmentEntity(final AnchorEntity anchorEntity, final DataNode dataNode) {
- return FragmentEntity.builder()
- .anchor(anchorEntity)
- .xpath(dataNode.getXpath())
- .attributes(jsonObjectMapper.asJsonString(dataNode.getLeaves()))
- .build();
+ @Override
+ public void addListElements(final String dataspaceName, final String anchorName, final String parentNodeXpath,
+ final Collection<DataNode> newListElements) {
+ final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName);
+ addChildrenDataNodes(anchorEntity, parentNodeXpath, newListElements);
}
@Override
- @Timed(value = "cps.data.persistence.service.datanode.get",
- description = "Time taken to get a data node")
- public Collection<DataNode> getDataNodes(final String dataspaceName, final String anchorName,
- final String xpath,
- final FetchDescendantsOption fetchDescendantsOption) {
- final String targetXpath = getNormalizedXpath(xpath);
- final Collection<DataNode> dataNodes = getDataNodesForMultipleXpaths(dataspaceName, anchorName,
- Collections.singletonList(targetXpath), fetchDescendantsOption);
- if (dataNodes.isEmpty()) {
- throw new DataNodeNotFoundException(dataspaceName, anchorName, xpath);
+ public void addChildDataNodes(final String dataspaceName, final String anchorName,
+ final String parentNodeXpath, final Collection<DataNode> dataNodes) {
+ final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName);
+ addChildrenDataNodes(anchorEntity, parentNodeXpath, dataNodes);
+ }
+
+ private void addChildrenDataNodes(final AnchorEntity anchorEntity, final String parentNodeXpath,
+ final Collection<DataNode> newChildren) {
+ final FragmentEntity parentFragmentEntity = getFragmentEntity(anchorEntity, parentNodeXpath);
+ final List<FragmentEntity> fragmentEntities = new ArrayList<>(newChildren.size());
+ try {
+ for (final DataNode newChildAsDataNode : newChildren) {
+ final FragmentEntity newChildAsFragmentEntity =
+ convertToFragmentWithAllDescendants(anchorEntity, newChildAsDataNode);
+ newChildAsFragmentEntity.setParentId(parentFragmentEntity.getId());
+ fragmentEntities.add(newChildAsFragmentEntity);
+ }
+ fragmentRepository.saveAll(fragmentEntities);
+ } catch (final DataIntegrityViolationException dataIntegrityViolationException) {
+ log.warn("Exception occurred : {} , While saving : {} children, retrying using individual save operations",
+ dataIntegrityViolationException, fragmentEntities.size());
+ retrySavingEachChildIndividually(anchorEntity, parentNodeXpath, newChildren);
}
- return dataNodes;
}
- @Override
- @Timed(value = "cps.data.persistence.service.datanode.batch.get",
- description = "Time taken to get data nodes")
- public Collection<DataNode> getDataNodesForMultipleXpaths(final String dataspaceName, final String anchorName,
- final Collection<String> xpaths,
- final FetchDescendantsOption fetchDescendantsOption) {
- final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName);
- Collection<FragmentEntity> fragmentEntities = getFragmentEntities(anchorEntity, xpaths);
- fragmentEntities = fragmentRepository.prefetchDescendantsOfFragmentEntities(fetchDescendantsOption,
- fragmentEntities);
- return createDataNodesFromFragmentEntities(fetchDescendantsOption, fragmentEntities);
+ private void addNewChildDataNode(final AnchorEntity anchorEntity, final String parentNodeXpath,
+ final DataNode newChild) {
+ final FragmentEntity parentFragmentEntity = getFragmentEntity(anchorEntity, parentNodeXpath);
+ final FragmentEntity newChildAsFragmentEntity = convertToFragmentWithAllDescendants(anchorEntity, newChild);
+ newChildAsFragmentEntity.setParentId(parentFragmentEntity.getId());
+ try {
+ fragmentRepository.save(newChildAsFragmentEntity);
+ } catch (final DataIntegrityViolationException dataIntegrityViolationException) {
+ throw AlreadyDefinedException.forDataNodes(Collections.singletonList(newChild.getXpath()),
+ anchorEntity.getName());
+ }
}
- private Collection<FragmentEntity> getFragmentEntities(final AnchorEntity anchorEntity,
- final Collection<String> xpaths) {
- final Collection<String> normalizedXpaths = getNormalizedXpaths(xpaths);
+ private void retrySavingEachChildIndividually(final AnchorEntity anchorEntity, final String parentNodeXpath,
+ final Collection<DataNode> newChildren) {
+ final Collection<String> failedXpaths = new HashSet<>();
+ for (final DataNode newChild : newChildren) {
+ try {
+ addNewChildDataNode(anchorEntity, parentNodeXpath, newChild);
+ } catch (final AlreadyDefinedException alreadyDefinedException) {
+ failedXpaths.add(newChild.getXpath());
+ }
+ }
+ if (!failedXpaths.isEmpty()) {
+ throw AlreadyDefinedException.forDataNodes(failedXpaths, anchorEntity.getName());
+ }
+ }
- final boolean haveRootXpath = normalizedXpaths.removeIf(CpsDataPersistenceServiceImpl::isRootXpath);
+ @Override
+ public void batchUpdateDataLeaves(final String dataspaceName, final String anchorName,
+ final Map<String, Map<String, Serializable>> updatedLeavesPerXPath) {
+ final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName);
- final List<FragmentEntity> fragmentEntities = fragmentRepository.findByAnchorAndXpathIn(anchorEntity,
- normalizedXpaths);
+ final Collection<String> xpathsOfUpdatedLeaves = updatedLeavesPerXPath.keySet();
+ final Collection<FragmentEntity> fragmentEntities = getFragmentEntities(anchorEntity, xpathsOfUpdatedLeaves);
for (final FragmentEntity fragmentEntity : fragmentEntities) {
- normalizedXpaths.remove(fragmentEntity.getXpath());
+ final Map<String, Serializable> updatedLeaves = updatedLeavesPerXPath.get(fragmentEntity.getXpath());
+ final String mergedLeaves = mergeLeaves(updatedLeaves, fragmentEntity.getAttributes());
+ fragmentEntity.setAttributes(mergedLeaves);
}
- for (final String xpath : normalizedXpaths) {
- if (!CpsPathUtil.isPathToListElement(xpath)) {
- fragmentEntities.addAll(fragmentRepository.findListByAnchorAndXpath(anchorEntity, xpath));
- }
+ try {
+ fragmentRepository.saveAll(fragmentEntities);
+ } catch (final StaleStateException staleStateException) {
+ retryUpdateDataNodesIndividually(anchorEntity, fragmentEntities);
}
+ }
- if (haveRootXpath) {
- fragmentEntities.addAll(fragmentRepository.findRootsByAnchorId(anchorEntity.getId()));
+ @Override
+ public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName,
+ final Collection<DataNode> updatedDataNodes) {
+ final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName);
+
+ final Map<String, DataNode> xpathToUpdatedDataNode = updatedDataNodes.stream()
+ .collect(Collectors.toMap(DataNode::getXpath, dataNode -> dataNode));
+
+ final Collection<String> xpaths = xpathToUpdatedDataNode.keySet();
+ Collection<FragmentEntity> existingFragmentEntities = getFragmentEntities(anchorEntity, xpaths);
+ existingFragmentEntities = fragmentRepository.prefetchDescendantsOfFragmentEntities(
+ FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS, existingFragmentEntities);
+
+ for (final FragmentEntity existingFragmentEntity : existingFragmentEntities) {
+ final DataNode updatedDataNode = xpathToUpdatedDataNode.get(existingFragmentEntity.getXpath());
+ updateFragmentEntityAndDescendantsWithDataNode(existingFragmentEntity, updatedDataNode);
}
- return fragmentEntities;
+ try {
+ fragmentRepository.saveAll(existingFragmentEntities);
+ } catch (final StaleStateException staleStateException) {
+ retryUpdateDataNodesIndividually(anchorEntity, existingFragmentEntities);
+ }
}
- private FragmentEntity getFragmentEntity(final AnchorEntity anchorEntity, final String xpath) {
- final FragmentEntity fragmentEntity;
- if (isRootXpath(xpath)) {
- fragmentEntity = fragmentRepository.findOneByAnchorId(anchorEntity.getId()).orElse(null);
- } else {
- fragmentEntity = fragmentRepository.getByAnchorAndXpath(anchorEntity, getNormalizedXpath(xpath));
+ private void retryUpdateDataNodesIndividually(final AnchorEntity anchorEntity,
+ final Collection<FragmentEntity> fragmentEntities) {
+ final Collection<String> failedXpaths = new HashSet<>();
+ for (final FragmentEntity dataNodeFragment : fragmentEntities) {
+ try {
+ fragmentRepository.save(dataNodeFragment);
+ } catch (final StaleStateException staleStateException) {
+ failedXpaths.add(dataNodeFragment.getXpath());
+ }
}
- if (fragmentEntity == null) {
- throw new DataNodeNotFoundException(anchorEntity.getDataspace().getName(), anchorEntity.getName(), xpath);
+ if (!failedXpaths.isEmpty()) {
+ final String failedXpathsConcatenated = String.join(",", failedXpaths);
+ throw new ConcurrencyException("Concurrent Transactions", String.format(
+ "DataNodes : %s in Dataspace :'%s' with Anchor : '%s' are updated by another transaction.",
+ failedXpathsConcatenated, anchorEntity.getDataspace().getName(), anchorEntity.getName()));
}
- return fragmentEntity;
+ }
+
+ private void updateFragmentEntityAndDescendantsWithDataNode(final FragmentEntity existingFragmentEntity,
+ final DataNode newDataNode) {
+ copyAttributesFromNewDataNode(existingFragmentEntity, newDataNode);
+
+ final Map<String, FragmentEntity> existingChildrenByXpath = existingFragmentEntity.getChildFragments().stream()
+ .collect(Collectors.toMap(FragmentEntity::getXpath, childFragmentEntity -> childFragmentEntity));
+
+ final Collection<FragmentEntity> updatedChildFragments = new HashSet<>();
+ for (final DataNode newDataNodeChild : newDataNode.getChildDataNodes()) {
+ final FragmentEntity childFragment;
+ if (isNewDataNode(newDataNodeChild, existingChildrenByXpath)) {
+ childFragment = convertToFragmentWithAllDescendants(existingFragmentEntity.getAnchor(),
+ newDataNodeChild);
+ } else {
+ childFragment = existingChildrenByXpath.get(newDataNodeChild.getXpath());
+ updateFragmentEntityAndDescendantsWithDataNode(childFragment, newDataNodeChild);
+ }
+ updatedChildFragments.add(childFragment);
+ }
+
+ existingFragmentEntity.getChildFragments().clear();
+ existingFragmentEntity.getChildFragments().addAll(updatedChildFragments);
}
@Override
@@ -338,11 +351,6 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
return createDataNodesFromFragmentEntities(fetchDescendantsOption, fragmentEntities);
}
- private List<Long> getAnchorIdsForPagination(final DataspaceEntity dataspaceEntity, final CpsPathQuery cpsPathQuery,
- final PaginationOption paginationOption) {
- return fragmentRepository.findAnchorIdsForPagination(dataspaceEntity, cpsPathQuery, paginationOption);
- }
-
private List<DataNode> createDataNodesFromFragmentEntities(final FetchDescendantsOption fetchDescendantsOption,
final Collection<FragmentEntity> fragmentEntities) {
final List<DataNode> dataNodes = new ArrayList<>(fragmentEntities.size());
@@ -352,29 +360,6 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
return Collections.unmodifiableList(dataNodes);
}
- private static String getNormalizedXpath(final String xpathSource) {
- if (isRootXpath(xpathSource)) {
- return xpathSource;
- }
- try {
- return CpsPathUtil.getNormalizedXpath(xpathSource);
- } catch (final PathParsingException pathParsingException) {
- throw new CpsPathException(pathParsingException.getMessage());
- }
- }
-
- private static Collection<String> getNormalizedXpaths(final Collection<String> xpaths) {
- final Collection<String> normalizedXpaths = new HashSet<>(xpaths.size());
- for (final String xpath : xpaths) {
- try {
- normalizedXpaths.add(getNormalizedXpath(xpath));
- } catch (final CpsPathException cpsPathException) {
- log.warn("Error parsing xpath \"{}\": {}", xpath, cpsPathException.getMessage());
- }
- }
- return normalizedXpaths;
- }
-
@Override
public String startSession() {
return sessionManager.startSession();
@@ -404,21 +389,6 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
return anchorIdList.size();
}
- private static Set<String> processAncestorXpath(final Collection<FragmentEntity> fragmentEntities,
- final CpsPathQuery cpsPathQuery) {
- final Set<String> ancestorXpath = new HashSet<>();
- final Pattern pattern =
- Pattern.compile("(.*/" + Pattern.quote(cpsPathQuery.getAncestorSchemaNodeIdentifier())
- + REG_EX_FOR_OPTIONAL_LIST_INDEX + "/.*");
- for (final FragmentEntity fragmentEntity : fragmentEntities) {
- final Matcher matcher = pattern.matcher(fragmentEntity.getXpath());
- if (matcher.matches()) {
- ancestorXpath.add(matcher.group(1));
- }
- }
- return ancestorXpath;
- }
-
private DataNode toDataNode(final FragmentEntity fragmentEntity,
final FetchDescendantsOption fetchDescendantsOption) {
final List<DataNode> childDataNodes = getChildDataNodes(fragmentEntity, fetchDescendantsOption);
@@ -434,103 +404,15 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
.withChildDataNodes(childDataNodes).build();
}
- private List<DataNode> getChildDataNodes(final FragmentEntity fragmentEntity,
- final FetchDescendantsOption fetchDescendantsOption) {
- if (fetchDescendantsOption.hasNext()) {
- return fragmentEntity.getChildFragments().stream()
- .map(childFragmentEntity -> toDataNode(childFragmentEntity, fetchDescendantsOption.next()))
- .collect(Collectors.toList());
- }
- return Collections.emptyList();
- }
-
- @Override
- public void batchUpdateDataLeaves(final String dataspaceName, final String anchorName,
- final Map<String, Map<String, Serializable>> updatedLeavesPerXPath) {
- final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName);
-
- final Collection<String> xpathsOfUpdatedLeaves = updatedLeavesPerXPath.keySet();
- final Collection<FragmentEntity> fragmentEntities = getFragmentEntities(anchorEntity, xpathsOfUpdatedLeaves);
-
- for (final FragmentEntity fragmentEntity : fragmentEntities) {
- final Map<String, Serializable> updatedLeaves = updatedLeavesPerXPath.get(fragmentEntity.getXpath());
- final String mergedLeaves = mergeLeaves(updatedLeaves, fragmentEntity.getAttributes());
- fragmentEntity.setAttributes(mergedLeaves);
- }
-
- try {
- fragmentRepository.saveAll(fragmentEntities);
- } catch (final StaleStateException staleStateException) {
- retryUpdateDataNodesIndividually(anchorEntity, fragmentEntities);
- }
- }
-
- @Override
- public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName,
- final Collection<DataNode> updatedDataNodes) {
- final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName);
-
- final Map<String, DataNode> xpathToUpdatedDataNode = updatedDataNodes.stream()
- .collect(Collectors.toMap(DataNode::getXpath, dataNode -> dataNode));
-
- final Collection<String> xpaths = xpathToUpdatedDataNode.keySet();
- Collection<FragmentEntity> existingFragmentEntities = getFragmentEntities(anchorEntity, xpaths);
- existingFragmentEntities = fragmentRepository.prefetchDescendantsOfFragmentEntities(
- FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS, existingFragmentEntities);
-
- for (final FragmentEntity existingFragmentEntity : existingFragmentEntities) {
- final DataNode updatedDataNode = xpathToUpdatedDataNode.get(existingFragmentEntity.getXpath());
- updateFragmentEntityAndDescendantsWithDataNode(existingFragmentEntity, updatedDataNode);
- }
-
- try {
- fragmentRepository.saveAll(existingFragmentEntities);
- } catch (final StaleStateException staleStateException) {
- retryUpdateDataNodesIndividually(anchorEntity, existingFragmentEntities);
- }
- }
-
- private void retryUpdateDataNodesIndividually(final AnchorEntity anchorEntity,
- final Collection<FragmentEntity> fragmentEntities) {
- final Collection<String> failedXpaths = new HashSet<>();
- for (final FragmentEntity dataNodeFragment : fragmentEntities) {
- try {
- fragmentRepository.save(dataNodeFragment);
- } catch (final StaleStateException staleStateException) {
- failedXpaths.add(dataNodeFragment.getXpath());
- }
- }
- if (!failedXpaths.isEmpty()) {
- final String failedXpathsConcatenated = String.join(",", failedXpaths);
- throw new ConcurrencyException("Concurrent Transactions", String.format(
- "DataNodes : %s in Dataspace :'%s' with Anchor : '%s' are updated by another transaction.",
- failedXpathsConcatenated, anchorEntity.getDataspace().getName(), anchorEntity.getName()));
- }
+ private FragmentEntity toFragmentEntity(final AnchorEntity anchorEntity, final DataNode dataNode) {
+ return FragmentEntity.builder()
+ .anchor(anchorEntity)
+ .xpath(dataNode.getXpath())
+ .attributes(jsonObjectMapper.asJsonString(dataNode.getLeaves()))
+ .build();
}
- private void updateFragmentEntityAndDescendantsWithDataNode(final FragmentEntity existingFragmentEntity,
- final DataNode newDataNode) {
- copyAttributesFromNewDataNode(existingFragmentEntity, newDataNode);
-
- final Map<String, FragmentEntity> existingChildrenByXpath = existingFragmentEntity.getChildFragments().stream()
- .collect(Collectors.toMap(FragmentEntity::getXpath, childFragmentEntity -> childFragmentEntity));
-
- final Collection<FragmentEntity> updatedChildFragments = new HashSet<>();
- for (final DataNode newDataNodeChild : newDataNode.getChildDataNodes()) {
- final FragmentEntity childFragment;
- if (isNewDataNode(newDataNodeChild, existingChildrenByXpath)) {
- childFragment = convertToFragmentWithAllDescendants(existingFragmentEntity.getAnchor(),
- newDataNodeChild);
- } else {
- childFragment = existingChildrenByXpath.get(newDataNodeChild.getXpath());
- updateFragmentEntityAndDescendantsWithDataNode(childFragment, newDataNodeChild);
- }
- updatedChildFragments.add(childFragment);
- }
- existingFragmentEntity.getChildFragments().clear();
- existingFragmentEntity.getChildFragments().addAll(updatedChildFragments);
- }
@Override
@Transactional
@@ -636,6 +518,116 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
}
}
+ @Override
+ @Timed(value = "cps.data.persistence.service.datanode.get",
+ description = "Time taken to get a data node")
+ public Collection<DataNode> getDataNodes(final String dataspaceName, final String anchorName,
+ final String xpath,
+ final FetchDescendantsOption fetchDescendantsOption) {
+ final String targetXpath = getNormalizedXpath(xpath);
+ final Collection<DataNode> dataNodes = getDataNodesForMultipleXpaths(dataspaceName, anchorName,
+ Collections.singletonList(targetXpath), fetchDescendantsOption);
+ if (dataNodes.isEmpty()) {
+ throw new DataNodeNotFoundException(dataspaceName, anchorName, xpath);
+ }
+ return dataNodes;
+ }
+
+ @Override
+ @Timed(value = "cps.data.persistence.service.datanode.batch.get",
+ description = "Time taken to get data nodes")
+ public Collection<DataNode> getDataNodesForMultipleXpaths(final String dataspaceName, final String anchorName,
+ final Collection<String> xpaths,
+ final FetchDescendantsOption fetchDescendantsOption) {
+ final AnchorEntity anchorEntity = getAnchorEntity(dataspaceName, anchorName);
+ Collection<FragmentEntity> fragmentEntities = getFragmentEntities(anchorEntity, xpaths);
+ fragmentEntities = fragmentRepository.prefetchDescendantsOfFragmentEntities(fetchDescendantsOption,
+ fragmentEntities);
+ return createDataNodesFromFragmentEntities(fetchDescendantsOption, fragmentEntities);
+ }
+
+ private List<DataNode> getChildDataNodes(final FragmentEntity fragmentEntity,
+ final FetchDescendantsOption fetchDescendantsOption) {
+ if (fetchDescendantsOption.hasNext()) {
+ return fragmentEntity.getChildFragments().stream()
+ .map(childFragmentEntity -> toDataNode(childFragmentEntity, fetchDescendantsOption.next()))
+ .collect(Collectors.toList());
+ }
+ return Collections.emptyList();
+ }
+
+ private AnchorEntity getAnchorEntity(final String dataspaceName, final String anchorName) {
+ final DataspaceEntity dataspaceEntity = dataspaceRepository.getByName(dataspaceName);
+ return anchorRepository.getByDataspaceAndName(dataspaceEntity, anchorName);
+ }
+
+ private List<Long> getAnchorIdsForPagination(final DataspaceEntity dataspaceEntity, final CpsPathQuery cpsPathQuery,
+ final PaginationOption paginationOption) {
+ return fragmentRepository.findAnchorIdsForPagination(dataspaceEntity, cpsPathQuery, paginationOption);
+ }
+
+ private static String getNormalizedXpath(final String xpathSource) {
+ if (isRootXpath(xpathSource)) {
+ return xpathSource;
+ }
+ try {
+ return CpsPathUtil.getNormalizedXpath(xpathSource);
+ } catch (final PathParsingException pathParsingException) {
+ throw new CpsPathException(pathParsingException.getMessage());
+ }
+ }
+
+ private static Collection<String> getNormalizedXpaths(final Collection<String> xpaths) {
+ final Collection<String> normalizedXpaths = new HashSet<>(xpaths.size());
+ for (final String xpath : xpaths) {
+ try {
+ normalizedXpaths.add(getNormalizedXpath(xpath));
+ } catch (final CpsPathException cpsPathException) {
+ log.warn("Error parsing xpath \"{}\": {}", xpath, cpsPathException.getMessage());
+ }
+ }
+ return normalizedXpaths;
+ }
+
+ private FragmentEntity getFragmentEntity(final AnchorEntity anchorEntity, final String xpath) {
+ final FragmentEntity fragmentEntity;
+ if (isRootXpath(xpath)) {
+ fragmentEntity = fragmentRepository.findOneByAnchorId(anchorEntity.getId()).orElse(null);
+ } else {
+ fragmentEntity = fragmentRepository.getByAnchorAndXpath(anchorEntity, getNormalizedXpath(xpath));
+ }
+ if (fragmentEntity == null) {
+ throw new DataNodeNotFoundException(anchorEntity.getDataspace().getName(), anchorEntity.getName(), xpath);
+ }
+ return fragmentEntity;
+ }
+
+ private Collection<FragmentEntity> getFragmentEntities(final AnchorEntity anchorEntity,
+ final Collection<String> xpaths) {
+ final Collection<String> normalizedXpaths = getNormalizedXpaths(xpaths);
+
+ final boolean haveRootXpath = normalizedXpaths.removeIf(CpsDataPersistenceServiceImpl::isRootXpath);
+
+ final List<FragmentEntity> fragmentEntities = fragmentRepository.findByAnchorAndXpathIn(anchorEntity,
+ normalizedXpaths);
+
+ for (final FragmentEntity fragmentEntity : fragmentEntities) {
+ normalizedXpaths.remove(fragmentEntity.getXpath());
+ }
+
+ for (final String xpath : normalizedXpaths) {
+ if (!CpsPathUtil.isPathToListElement(xpath)) {
+ fragmentEntities.addAll(fragmentRepository.findListByAnchorAndXpath(anchorEntity, xpath));
+ }
+ }
+
+ if (haveRootXpath) {
+ fragmentEntities.addAll(fragmentRepository.findRootsByAnchorId(anchorEntity.getId()));
+ }
+
+ return fragmentEntities;
+ }
+
private static String getListElementXpathPrefix(final Collection<DataNode> newListElements) {
if (newListElements.isEmpty()) {
throw new CpsAdminException("Invalid list replacement",
@@ -660,20 +652,6 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
return existingListElementEntity;
}
- private static boolean isNewDataNode(final DataNode replacementDataNode,
- final Map<String, FragmentEntity> existingListElementsByXpath) {
- return !existingListElementsByXpath.containsKey(replacementDataNode.getXpath());
- }
-
- private void copyAttributesFromNewDataNode(final FragmentEntity existingFragmentEntity,
- final DataNode newDataNode) {
- final String oldOrderedLeavesAsJson = getOrderedLeavesAsJson(existingFragmentEntity.getAttributes());
- final String newOrderedLeavesAsJson = getOrderedLeavesAsJson(newDataNode.getLeaves());
- if (!oldOrderedLeavesAsJson.equals(newOrderedLeavesAsJson)) {
- existingFragmentEntity.setAttributes(jsonObjectMapper.asJsonString(newDataNode.getLeaves()));
- }
- }
-
private String getOrderedLeavesAsJson(final Map<String, Serializable> currentLeaves) {
final Map<String, Serializable> sortedLeaves = new TreeMap<>(String::compareTo);
sortedLeaves.putAll(currentLeaves);
@@ -685,7 +663,7 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
return "{}";
}
final Map<String, Serializable> sortedLeaves = jsonObjectMapper.convertJsonString(currentLeavesAsString,
- TreeMap.class);
+ TreeMap.class);
return jsonObjectMapper.asJsonString(sortedLeaves);
}
@@ -696,10 +674,39 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
.collect(Collectors.toMap(FragmentEntity::getXpath, fragmentEntity -> fragmentEntity));
}
+ private static Set<String> processAncestorXpath(final Collection<FragmentEntity> fragmentEntities,
+ final CpsPathQuery cpsPathQuery) {
+ final Set<String> ancestorXpath = new HashSet<>();
+ final Pattern pattern =
+ Pattern.compile("(.*/" + Pattern.quote(cpsPathQuery.getAncestorSchemaNodeIdentifier())
+ + REG_EX_FOR_OPTIONAL_LIST_INDEX + "/.*");
+ for (final FragmentEntity fragmentEntity : fragmentEntities) {
+ final Matcher matcher = pattern.matcher(fragmentEntity.getXpath());
+ if (matcher.matches()) {
+ ancestorXpath.add(matcher.group(1));
+ }
+ }
+ return ancestorXpath;
+ }
+
private static boolean isRootXpath(final String xpath) {
return "/".equals(xpath) || "".equals(xpath);
}
+ private static boolean isNewDataNode(final DataNode replacementDataNode,
+ final Map<String, FragmentEntity> existingListElementsByXpath) {
+ return !existingListElementsByXpath.containsKey(replacementDataNode.getXpath());
+ }
+
+ private void copyAttributesFromNewDataNode(final FragmentEntity existingFragmentEntity,
+ final DataNode newDataNode) {
+ final String oldOrderedLeavesAsJson = getOrderedLeavesAsJson(existingFragmentEntity.getAttributes());
+ final String newOrderedLeavesAsJson = getOrderedLeavesAsJson(newDataNode.getLeaves());
+ if (!oldOrderedLeavesAsJson.equals(newOrderedLeavesAsJson)) {
+ existingFragmentEntity.setAttributes(jsonObjectMapper.asJsonString(newDataNode.getLeaves()));
+ }
+ }
+
private String mergeLeaves(final Map<String, Serializable> updateLeaves, final String currentLeavesAsString) {
Map<String, Serializable> currentLeavesAsMap = new HashMap<>();
if (currentLeavesAsString != null) {
@@ -712,9 +719,4 @@ public class CpsDataPersistenceServiceImpl implements CpsDataPersistenceService
}
return jsonObjectMapper.asJsonString(currentLeavesAsMap);
}
-
- private AnchorEntity getAnchorEntity(final String dataspaceName, final String anchorName) {
- final DataspaceEntity dataspaceEntity = dataspaceRepository.getByName(dataspaceName);
- return anchorRepository.getByDataspaceAndName(dataspaceEntity, anchorName);
- }
}
diff --git a/csit/tests/cps-data-operations/cps-data-operations.robot b/csit/tests/cps-data-operations/cps-data-operations.robot
index 1f6611955e..17dce16446 100644
--- a/csit/tests/cps-data-operations/cps-data-operations.robot
+++ b/csit/tests/cps-data-operations/cps-data-operations.robot
@@ -46,18 +46,19 @@ NCMP Data Operation, forwarded to DMI, response on Client Topic
${params}= Create Dictionary topic=${topic}
${headers}= Create Dictionary Content-Type=application/json Authorization=${auth}
POST On Session CPS_URL ncmpInventory/v1/ch headers=${headers} data=${newCmHandleRequestBody}
- Sleep 8 wait some time to get updated the cm handle state to READY
+ ${getCmHandleUri}= Set Variable ${ncmpBasePath}/v1/ch/CMHandle1
+ ${getCmHandleHeaders}= Create Dictionary Authorization=${auth}
+ Wait Until Keyword Succeeds 8sec 100ms Is CM Handle READY ${getCmHandleUri} ${getCmHandleHeaders} CMHandle1
${response}= POST On Session CPS_URL ${uri} params=${params} headers=${headers} data=${dataOperationReqBody}
Set Global Variable ${expectedRequestId} ${response.json()}[requestId]
Should Be Equal As Strings ${response.status_code} 200
- Sleep 5 wait some time to get published a message to the client topic
Consume cloud event from client topic
${group_id}= Create Consumer auto_offset_reset=earliest
Subscribe Topic topics=${topic} group_id=${group_id}
${messages}= Poll group_id=${group_id} only_value=false
- ${event} Set Variable ${messages}[0]
- ${headers} Set Variable ${event.headers()}
+ ${event} Set Variable ${messages}[0]
+ ${headers} Set Variable ${event.headers()}
FOR ${header_key_value_pair} IN @{headers}
Compare Header Values ${header_key_value_pair[0]} ${header_key_value_pair[1]} "ce_specversion" "1.0"
Compare Header Values ${header_key_value_pair[0]} ${header_key_value_pair[1]} "ce_type" "org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent"
@@ -68,9 +69,19 @@ Consume cloud event from client topic
*** Keywords ***
Compare Header Values
- [Arguments] ${header_key} ${header_value} ${header_to_check} ${expected_header_value}
+ [Arguments] ${header_key} ${header_value} ${header_to_check} ${expected_header_value}
IF "${header_key}" == ${header_to_check}
- Should Be Equal As Strings "${header_value}" ${expected_header_value}
+ Should Be Equal As Strings "${header_value}" ${expected_header_value}
+ END
+
+Is CM Handle READY
+ [Arguments] ${uri} ${headers} ${cmHandle}
+ ${response}= GET On Session CPS_URL ${uri} headers=${headers}
+ Should Be Equal As Strings ${response.status_code} 200
+ FOR ${item} IN ${response.json()}
+ IF "${item['cmHandle']}" == "${cmHandle}"
+ Should Be Equal As Strings ${item['state']['cmHandleState']} READY
+ END
END
Basic Teardown
diff --git a/csit/tests/cps-data-sync/cps-data-sync.robot b/csit/tests/cps-data-sync/cps-data-sync.robot
index 71de4be1fe..c0ee4da67b 100644
--- a/csit/tests/cps-data-sync/cps-data-sync.robot
+++ b/csit/tests/cps-data-sync/cps-data-sync.robot
@@ -34,20 +34,40 @@ ${auth} Basic Y3BzdXNlcjpjcHNyMGNrcyE=
${ncmpBasePath} /ncmp
*** Test Cases ***
+
+Check if ietfYang-PNFDemo is READY
+ ${uri}= Set Variable ${ncmpBasePath}/v1/ch/ietfYang-PNFDemo
+ ${headers}= Create Dictionary Authorization=${auth}
+ Wait Until Keyword Succeeds 10sec 100ms Is CM Handle READY ${uri} ${headers} ietfYang-PNFDemo
+
Operational state goes to UNSYNCHRONIZED when data sync (flag) is enabled
${uri}= Set Variable ${ncmpBasePath}/v1/ch/ietfYang-PNFDemo/data-sync
${params}= Create Dictionary dataSyncEnabled=true
${headers}= Create Dictionary Authorization=${auth}
${response}= PUT On Session CPS_URL ${uri} params=${params} headers=${headers}
Should Be Equal As Strings ${response.status_code} 200
- ${verifyUri}= Set Variable ${ncmpBasePath}/v1/ch/ietfYang-PNFDemo/state
- ${verifyHeaders}= Create Dictionary Authorization=${auth}
- ${verifyResponse}= GET On Session CPS_URL ${verifyUri} headers=${verifyHeaders}
- Should Be Equal As Strings ${verifyResponse.json()['state']['dataSyncState']['operational']['syncState']} UNSYNCHRONIZED
- Sleep 5
+ ${verifyUri}= Set Variable ${ncmpBasePath}/v1/ch/ietfYang-PNFDemo/state
+ ${verifyHeaders}= Create Dictionary Authorization=${auth}
+ ${verifyResponse}= GET On Session CPS_URL ${verifyUri} headers=${verifyHeaders}
+ Should Be Equal As Strings ${verifyResponse.json()['state']['dataSyncState']['operational']['syncState']} UNSYNCHRONIZED
Operational state goes to SYNCHRONIZED after sometime when data sync (flag) is enabled
- ${uri}= Set Variable ${ncmpBasePath}/v1/ch/ietfYang-PNFDemo/state
- ${headers}= Create Dictionary Authorization=${auth}
- ${response}= GET On Session CPS_URL ${uri} headers=${headers}
- Should Be Equal As Strings ${response.json()['state']['dataSyncState']['operational']['syncState']} SYNCHRONIZED \ No newline at end of file
+ ${uri}= Set Variable ${ncmpBasePath}/v1/ch/ietfYang-PNFDemo/state
+ ${headers}= Create Dictionary Authorization=${auth}
+ Wait Until Keyword Succeeds 10sec 100ms Is CM Handle State SYNCHRONIZED ${uri} ${headers}
+
+*** Keywords ***
+Is CM Handle READY
+ [Arguments] ${uri} ${headers} ${cmHandle}
+ ${response}= GET On Session CPS_URL ${uri} headers=${headers}
+ Should Be Equal As Strings ${response.status_code} 200
+ FOR ${item} IN ${response.json()}
+ IF "${item['cmHandle']}" == "${cmHandle}"
+ Should Be Equal As Strings ${item['state']['cmHandleState']} READY
+ END
+ END
+
+Is CM Handle State SYNCHRONIZED
+ [Arguments] ${uri} ${headers}
+ ${response}= GET On Session CPS_URL ${uri} headers=${headers}
+ Should Be Equal As Strings ${response.json()['state']['dataSyncState']['operational']['syncState']} SYNCHRONIZED
diff --git a/csit/tests/cps-model-sync/cps-model-sync.robot b/csit/tests/cps-model-sync/cps-model-sync.robot
index 704d02c4ba..3e8551f7f5 100644
--- a/csit/tests/cps-model-sync/cps-model-sync.robot
+++ b/csit/tests/cps-model-sync/cps-model-sync.robot
@@ -85,5 +85,4 @@ Get modules for registered data node
IF "${item['moduleName']}" == "stores"
Should Be Equal As Strings "${item['revision']}" "2020-09-15"
END
- END
- Sleep 10 \ No newline at end of file
+ END \ No newline at end of file
diff --git a/csit/tests/cps-trust-level/cps-trust-level.robot b/csit/tests/cps-trust-level/cps-trust-level.robot
index 70659000cf..e4deeff32b 100644
--- a/csit/tests/cps-trust-level/cps-trust-level.robot
+++ b/csit/tests/cps-trust-level/cps-trust-level.robot
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
+ * Copyright (C) 2023-2024 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -44,14 +44,13 @@ Register data node
${headers}= Create Dictionary Content-Type=application/json Authorization=${auth}
${response}= POST On Session CPS_URL ${uri} headers=${headers} data=${jsonCreateCmHandles}
Should Be Equal As Strings ${response.status_code} 200
- Sleep 5
Verify notification
${group_id}= Create Consumer auto_offset_reset=earliest
- Subscribe Topic topics=cm-events group_id=${group_id}
- ${result}= Poll group_id=${group_id} only_value=False poll_attempts=5
- ${headers} Set Variable ${result[0].headers()}
- ${payload} Set Variable ${result[0].value()}
+ Subscribe Topic topics=cm-events group_id=${group_id}
+ ${result}= Poll group_id=${group_id} only_value=False poll_attempts=5
+ ${headers} Set Variable ${result[0].headers()}
+ ${payload} Set Variable ${result[0].value()}
FOR ${header_key_value_pair} IN @{headers}
Compare Header Values ${header_key_value_pair[0]} ${header_key_value_pair[1]} "ce_specversion" "1.0"
Compare Header Values ${header_key_value_pair[0]} ${header_key_value_pair[1]} "ce_source" "NCMP"