summaryrefslogtreecommitdiffstats
path: root/cps-service/src
diff options
context:
space:
mode:
authorToine Siebelink <toine.siebelink@est.tech>2023-12-18 15:16:26 +0000
committerGerrit Code Review <gerrit@onap.org>2023-12-18 15:16:26 +0000
commit597d4ef72e5388092cc80ae662af6711f735369f (patch)
tree4ad76262d8ff886c96fe7fe5cbcbfa830f9004d4 /cps-service/src
parent66d033e5bb4317d02d343fc3a5f50dde2d4097d6 (diff)
parentdb6b8f845ac72da59c12831ae8c7efa180f9ace2 (diff)
Merge "Remove Notification code for updated events"
Diffstat (limited to 'cps-service/src')
-rwxr-xr-xcps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java40
-rw-r--r--cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java108
-rw-r--r--cps-service/src/main/java/org/onap/cps/notification/KafkaProducerListener.java52
-rw-r--r--cps-service/src/main/java/org/onap/cps/notification/NotificationErrorHandler.java40
-rw-r--r--cps-service/src/main/java/org/onap/cps/notification/NotificationProperties.java43
-rw-r--r--cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java66
-rw-r--r--cps-service/src/main/java/org/onap/cps/notification/NotificationService.java124
-rw-r--r--cps-service/src/main/java/org/onap/cps/notification/Operation.java27
-rw-r--r--cps-service/src/test/groovy/org/onap/cps/api/impl/CpsDataServiceImplSpec.groovy56
-rwxr-xr-xcps-service/src/test/groovy/org/onap/cps/api/impl/E2ENetworkSliceSpec.groovy5
-rw-r--r--cps-service/src/test/groovy/org/onap/cps/config/AsyncConfigSpec.groovy46
-rw-r--r--cps-service/src/test/groovy/org/onap/cps/notification/CpsDataUpdatedEventFactorySpec.groovy142
-rw-r--r--cps-service/src/test/groovy/org/onap/cps/notification/KafkaPublisherSpecBase.groovy93
-rw-r--r--cps-service/src/test/groovy/org/onap/cps/notification/KafkaTestContainerConfig.groovy49
-rw-r--r--cps-service/src/test/groovy/org/onap/cps/notification/NotificationErrorHandlerSpec.groovy60
-rw-r--r--cps-service/src/test/groovy/org/onap/cps/notification/NotificationPublisherSpec.groovy91
-rw-r--r--cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy158
17 files changed, 51 insertions, 1149 deletions
diff --git a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java
index e74e0ad249..6672d6883f 100755
--- a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java
+++ b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java
@@ -24,10 +24,6 @@
package org.onap.cps.api.impl;
-import static org.onap.cps.notification.Operation.CREATE;
-import static org.onap.cps.notification.Operation.DELETE;
-import static org.onap.cps.notification.Operation.UPDATE;
-
import io.micrometer.core.annotation.Timed;
import java.io.Serializable;
import java.time.OffsetDateTime;
@@ -43,8 +39,6 @@ import org.onap.cps.api.CpsAdminService;
import org.onap.cps.api.CpsDataService;
import org.onap.cps.api.CpsDeltaService;
import org.onap.cps.cpspath.parser.CpsPathUtil;
-import org.onap.cps.notification.NotificationService;
-import org.onap.cps.notification.Operation;
import org.onap.cps.spi.CpsDataPersistenceService;
import org.onap.cps.spi.FetchDescendantsOption;
import org.onap.cps.spi.exceptions.DataValidationException;
@@ -70,7 +64,6 @@ public class CpsDataServiceImpl implements CpsDataService {
private final CpsDataPersistenceService cpsDataPersistenceService;
private final CpsAdminService cpsAdminService;
private final YangTextSchemaSourceSetCache yangTextSchemaSourceSetCache;
- private final NotificationService notificationService;
private final CpsValidator cpsValidator;
private final TimedYangParser timedYangParser;
private final CpsDeltaService cpsDeltaService;
@@ -90,7 +83,6 @@ public class CpsDataServiceImpl implements CpsDataService {
final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
final Collection<DataNode> dataNodes = buildDataNodes(anchor, ROOT_NODE_XPATH, nodeData, contentType);
cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes);
- processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, CREATE, observedTimestamp);
}
@Override
@@ -109,7 +101,6 @@ public class CpsDataServiceImpl implements CpsDataService {
final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, nodeData, contentType);
cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
- processDataUpdatedEventAsync(anchor, parentNodeXpath, CREATE, observedTimestamp);
}
@Override
@@ -127,7 +118,6 @@ public class CpsDataServiceImpl implements CpsDataService {
cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath,
listElementDataNodeCollection);
}
- processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
}
@Override
@@ -141,7 +131,6 @@ public class CpsDataServiceImpl implements CpsDataService {
buildDataNodes(anchor, parentNodeXpath, jsonDataList, ContentType.JSON);
cpsDataPersistenceService.addMultipleLists(dataspaceName, anchorName, parentNodeXpath,
listElementDataNodeCollections);
- processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
}
@Override
@@ -177,7 +166,6 @@ public class CpsDataServiceImpl implements CpsDataService {
final Map<String, Map<String, Serializable>> xpathToUpdatedLeaves = dataNodesInPatch.stream()
.collect(Collectors.toMap(DataNode::getXpath, DataNode::getLeaves));
cpsDataPersistenceService.batchUpdateDataLeaves(dataspaceName, anchorName, xpathToUpdatedLeaves);
- processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
}
@Override
@@ -194,7 +182,6 @@ public class CpsDataServiceImpl implements CpsDataService {
for (final DataNode dataNodeUpdate : dataNodeUpdates) {
processDataNodeUpdate(anchor, dataNodeUpdate);
}
- processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
}
@Override
@@ -244,7 +231,6 @@ public class CpsDataServiceImpl implements CpsDataService {
final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
- processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
}
@Override
@@ -257,8 +243,6 @@ public class CpsDataServiceImpl implements CpsDataService {
final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
final Collection<DataNode> dataNodes = buildDataNodes(anchor, nodesJsonData);
cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
- nodesJsonData.keySet().forEach(nodeXpath ->
- processDataUpdatedEventAsync(anchor, nodeXpath, UPDATE, observedTimestamp));
}
@Override
@@ -279,9 +263,7 @@ public class CpsDataServiceImpl implements CpsDataService {
public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) {
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
- final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
- processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
}
@Override
@@ -290,9 +272,7 @@ public class CpsDataServiceImpl implements CpsDataService {
public void deleteDataNode(final String dataspaceName, final String anchorName, final String dataNodeXpath,
final OffsetDateTime observedTimestamp) {
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
- final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath);
- processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp);
}
@Override
@@ -302,9 +282,6 @@ public class CpsDataServiceImpl implements CpsDataService {
final Collection<String> dataNodeXpaths, final OffsetDateTime observedTimestamp) {
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths);
- final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
- dataNodeXpaths.forEach(dataNodeXpath ->
- processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp));
}
@Override
@@ -313,8 +290,6 @@ public class CpsDataServiceImpl implements CpsDataService {
public void deleteDataNodes(final String dataspaceName, final String anchorName,
final OffsetDateTime observedTimestamp) {
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
- final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
- processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName);
}
@@ -325,9 +300,6 @@ public class CpsDataServiceImpl implements CpsDataService {
final OffsetDateTime observedTimestamp) {
cpsValidator.validateNameCharacters(dataspaceName);
cpsValidator.validateNameCharacters(anchorNames);
- for (final Anchor anchor : cpsAdminService.getAnchors(dataspaceName, anchorNames)) {
- processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
- }
cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorNames);
}
@@ -337,9 +309,7 @@ public class CpsDataServiceImpl implements CpsDataService {
public void deleteListOrListElement(final String dataspaceName, final String anchorName, final String listNodeXpath,
final OffsetDateTime observedTimestamp) {
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
- final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath);
- processDataUpdatedEventAsync(anchor, listNodeXpath, DELETE, observedTimestamp);
}
private Collection<DataNode> buildDataNodes(final Anchor anchor, final Map<String, String> nodesJsonData) {
@@ -385,16 +355,6 @@ public class CpsDataServiceImpl implements CpsDataService {
.collect(Collectors.toList());
}
- private void processDataUpdatedEventAsync(final Anchor anchor, final String xpath,
- final Operation operation, final OffsetDateTime observedTimestamp) {
- try {
- notificationService.processDataUpdatedEvent(anchor, xpath, operation, observedTimestamp);
- } catch (final Exception exception) {
- //If async message can't be queued for notification service, the initial request should not fail.
- log.error("Failed to send message to notification service", exception);
- }
- }
-
private SchemaContext getSchemaContext(final Anchor anchor) {
return yangTextSchemaSourceSetCache
.get(anchor.getDataspaceName(), anchor.getSchemaSetName()).getSchemaContext();
diff --git a/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java b/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java
deleted file mode 100644
index 696fd60f8c..0000000000
--- a/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (c) 2021-2022 Bell Canada.
- * Modifications Copyright (c) 2022-2023 Nordix Foundation
- * Modifications Copyright (C) 2023 TechMahindra Ltd.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.notification;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.time.OffsetDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.UUID;
-import lombok.AllArgsConstructor;
-import lombok.SneakyThrows;
-import org.onap.cps.api.CpsDataService;
-import org.onap.cps.event.model.Content;
-import org.onap.cps.event.model.CpsDataUpdatedEvent;
-import org.onap.cps.event.model.Data;
-import org.onap.cps.spi.FetchDescendantsOption;
-import org.onap.cps.spi.model.Anchor;
-import org.onap.cps.spi.model.DataNode;
-import org.onap.cps.utils.DataMapUtils;
-import org.onap.cps.utils.PrefixResolver;
-import org.springframework.context.annotation.Lazy;
-import org.springframework.stereotype.Component;
-
-@Component
-@AllArgsConstructor(onConstructor = @__(@Lazy))
-public class CpsDataUpdatedEventFactory {
-
- private static final DateTimeFormatter DATE_TIME_FORMATTER =
- DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
-
- @Lazy
- private final CpsDataService cpsDataService;
-
- @Lazy
- private final PrefixResolver prefixResolver;
-
- /**
- * Generates CPS Data Updated event. If observedTimestamp is not provided, then current timestamp is used.
- *
- * @param anchor anchor
- * @param observedTimestamp observedTimestamp
- * @param operation operation
- * @return CpsDataUpdatedEvent
- */
- public CpsDataUpdatedEvent createCpsDataUpdatedEvent(final Anchor anchor,
- final OffsetDateTime observedTimestamp, final Operation operation) {
- final var dataNode = (operation == Operation.DELETE) ? null :
- cpsDataService.getDataNodes(anchor.getDataspaceName(), anchor.getName(),
- "/", FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS).iterator().next();
- return toCpsDataUpdatedEvent(anchor, dataNode, observedTimestamp, operation);
- }
-
- @SneakyThrows(URISyntaxException.class)
- private CpsDataUpdatedEvent toCpsDataUpdatedEvent(final Anchor anchor,
- final DataNode dataNode,
- final OffsetDateTime observedTimestamp,
- final Operation operation) {
- final CpsDataUpdatedEvent cpsDataUpdatedEvent = new CpsDataUpdatedEvent();
- cpsDataUpdatedEvent.withContent(createContent(anchor, dataNode, observedTimestamp, operation));
- cpsDataUpdatedEvent.withId(UUID.randomUUID().toString());
- cpsDataUpdatedEvent.withSchema(new URI("urn:cps:org.onap.cps:data-updated-event-schema:v1"));
- cpsDataUpdatedEvent.withSource(new URI("urn:cps:org.onap.cps"));
- cpsDataUpdatedEvent.withType("org.onap.cps.data-updated-event");
- return cpsDataUpdatedEvent;
- }
-
- private Data createData(final DataNode dataNode, final String prefix) {
- final Data data = new Data();
- DataMapUtils.toDataMapWithIdentifier(dataNode, prefix).forEach(data::setAdditionalProperty);
- return data;
- }
-
- private Content createContent(final Anchor anchor, final DataNode dataNode,
- final OffsetDateTime observedTimestamp, final Operation operation) {
- final var content = new Content();
- content.withAnchorName(anchor.getName());
- content.withDataspaceName(anchor.getDataspaceName());
- content.withSchemaSetName(anchor.getSchemaSetName());
- content.withOperation(Content.Operation.fromValue(operation.name()));
- content.withObservedTimestamp(
- DATE_TIME_FORMATTER.format(observedTimestamp == null ? OffsetDateTime.now() : observedTimestamp));
- if (dataNode != null) {
- final String prefix = prefixResolver.getPrefix(anchor, dataNode.getXpath());
- content.withData(createData(dataNode, prefix));
- }
- return content;
- }
-}
diff --git a/cps-service/src/main/java/org/onap/cps/notification/KafkaProducerListener.java b/cps-service/src/main/java/org/onap/cps/notification/KafkaProducerListener.java
deleted file mode 100644
index f4b68c0699..0000000000
--- a/cps-service/src/main/java/org/onap/cps/notification/KafkaProducerListener.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2021 Bell Canada. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.notification;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.springframework.kafka.support.ProducerListener;
-import org.springframework.stereotype.Component;
-
-@Slf4j
-@Component
-public class KafkaProducerListener<K, V> implements ProducerListener<K, V> {
-
- private NotificationErrorHandler notificationErrorHandler;
-
- public KafkaProducerListener(final NotificationErrorHandler notificationErrorHandler) {
- this.notificationErrorHandler = notificationErrorHandler;
- }
-
- @Override
- public void onSuccess(final ProducerRecord<K, V> producerRecord, final RecordMetadata recordMetadata) {
- log.debug("Message sent to event-bus topic :'{}' with body : {} ", producerRecord.topic(),
- producerRecord.value());
- }
-
- @Override
- public void onError(final ProducerRecord<K, V> producerRecord,
- final RecordMetadata recordMetadata,
- final Exception exception) {
- notificationErrorHandler.onException("Failed to send message to message bus",
- exception, producerRecord, recordMetadata);
- }
-
-}
diff --git a/cps-service/src/main/java/org/onap/cps/notification/NotificationErrorHandler.java b/cps-service/src/main/java/org/onap/cps/notification/NotificationErrorHandler.java
deleted file mode 100644
index eef028d5f3..0000000000
--- a/cps-service/src/main/java/org/onap/cps/notification/NotificationErrorHandler.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2021 Bell Canada. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.notification;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-@Component
-@Slf4j
-public class NotificationErrorHandler {
-
- void onException(final Exception exception, final Object... context) {
- onException("Failed to process", exception, context);
- }
-
- void onException(final String message, final Exception exception, final Object... context) {
- log.error("{} \n Error cause: {} \n Error context: {}",
- message,
- exception.getCause() != null ? exception.getCause().toString() : exception.getMessage(),
- context,
- exception);
- }
-}
diff --git a/cps-service/src/main/java/org/onap/cps/notification/NotificationProperties.java b/cps-service/src/main/java/org/onap/cps/notification/NotificationProperties.java
deleted file mode 100644
index b8a7144b3d..0000000000
--- a/cps-service/src/main/java/org/onap/cps/notification/NotificationProperties.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2021 Bell Canada. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.notification;
-
-import jakarta.validation.constraints.NotNull;
-import java.util.Collections;
-import java.util.Map;
-import lombok.Data;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.stereotype.Component;
-import org.springframework.validation.annotation.Validated;
-
-@ConfigurationProperties(prefix = "notification.data-updated")
-@Component
-@Data
-@Validated
-public class NotificationProperties {
-
- @NotNull
- private String topic;
- private Map<String, String> filters = Collections.emptyMap();
-
- @Value("${notification.enabled:true}")
- private boolean enabled;
-}
diff --git a/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java b/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java
deleted file mode 100644
index 2d87488245..0000000000
--- a/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (c) 2021 Bell Canada.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.notification;
-
-import lombok.extern.slf4j.Slf4j;
-import org.checkerframework.checker.nullness.qual.NonNull;
-import org.onap.cps.event.model.CpsDataUpdatedEvent;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.stereotype.Component;
-
-@Component
-@Slf4j
-public class NotificationPublisher {
-
- private KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate;
- private String topicName;
-
- /**
- * Create an instance of Notification Publisher.
- *
- * @param kafkaTemplate kafkaTemplate is send event using kafka
- * @param topicName topic, to which cpsDataUpdatedEvent is sent, is provided by setting
- * 'notification.data-updated.topic' in the application properties
- */
- @Autowired
- public NotificationPublisher(
- final KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate,
- final @Value("${notification.data-updated.topic}") String topicName) {
- this.kafkaTemplate = kafkaTemplate;
- this.topicName = topicName;
- }
-
- /**
- * Send event to Kafka with correct message key.
- *
- * @param cpsDataUpdatedEvent event to be sent to kafka
- */
- public void sendNotification(@NonNull final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
- final var messageKey = cpsDataUpdatedEvent.getContent().getDataspaceName() + ","
- + cpsDataUpdatedEvent.getContent().getAnchorName();
- log.debug("Data Updated event is being sent with messageKey: '{}' & body : {} ",
- messageKey, cpsDataUpdatedEvent);
- kafkaTemplate.send(topicName, messageKey, cpsDataUpdatedEvent);
- }
-
-}
diff --git a/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java b/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java
deleted file mode 100644
index c29d042293..0000000000
--- a/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (c) 2021-2022 Bell Canada.
- * Modifications Copyright (C) 2022-2023 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.notification;
-
-import jakarta.annotation.PostConstruct;
-import java.time.OffsetDateTime;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.api.CpsAdminService;
-import org.onap.cps.spi.model.Anchor;
-import org.springframework.scheduling.annotation.Async;
-import org.springframework.stereotype.Service;
-
-@Service
-@Slf4j
-@RequiredArgsConstructor
-public class NotificationService {
-
- private final NotificationProperties notificationProperties;
- private final NotificationPublisher notificationPublisher;
- private final CpsDataUpdatedEventFactory cpsDataUpdatedEventFactory;
- private final NotificationErrorHandler notificationErrorHandler;
- private final CpsAdminService cpsAdminService;
- private List<Pattern> dataspacePatterns;
-
- @PostConstruct
- public void init() {
- log.info("Notification Properties {}", notificationProperties);
- this.dataspacePatterns = getDataspaceFilterPatterns(notificationProperties);
- }
-
- private List<Pattern> getDataspaceFilterPatterns(final NotificationProperties notificationProperties) {
- if (notificationProperties.isEnabled()) {
- return Arrays.stream(notificationProperties.getFilters()
- .getOrDefault("enabled-dataspaces", "")
- .split(","))
- .map(filterPattern -> Pattern.compile(filterPattern, Pattern.CASE_INSENSITIVE))
- .collect(Collectors.toList());
- } else {
- return Collections.emptyList();
- }
- }
-
- /**
- * Process Data Updated Event and publishes the notification.
- *
- * @param anchor anchor
- * @param xpath xpath of changed data node
- * @param operation operation
- * @param observedTimestamp observedTimestamp
- * @return future
- */
- @Async("notificationExecutor")
- public Future<Void> processDataUpdatedEvent(final Anchor anchor, final String xpath, final Operation operation,
- final OffsetDateTime observedTimestamp) {
-
- log.debug("process data updated event for anchor '{}'", anchor);
- try {
- if (shouldSendNotification(anchor.getDataspaceName())) {
- final var cpsDataUpdatedEvent =
- cpsDataUpdatedEventFactory.createCpsDataUpdatedEvent(anchor,
- observedTimestamp, getRootNodeOperation(xpath, operation));
- log.debug("data updated event to be published {}", cpsDataUpdatedEvent);
- notificationPublisher.sendNotification(cpsDataUpdatedEvent);
- }
- } catch (final Exception exception) {
- /* All the exceptions are handled to not to propagate it to caller.
- CPS operation should not fail if sending event fails for any reason.
- */
- notificationErrorHandler.onException("Failed to process cps-data-updated-event.",
- exception, anchor, xpath, operation);
- }
- return CompletableFuture.completedFuture(null);
- }
-
- /*
- Add more complex rules based on dataspace and anchor later
- */
- private boolean shouldSendNotification(final String dataspaceName) {
-
- return notificationProperties.isEnabled()
- && dataspacePatterns.stream()
- .anyMatch(pattern -> pattern.matcher(dataspaceName).find());
- }
-
- private Operation getRootNodeOperation(final String xpath, final Operation operation) {
- return isRootXpath(xpath) || isRootContainerNodeXpath(xpath) ? operation : Operation.UPDATE;
- }
-
- private static boolean isRootXpath(final String xpath) {
- return "/".equals(xpath) || "".equals(xpath);
- }
-
- private static boolean isRootContainerNodeXpath(final String xpath) {
- return 0 == xpath.lastIndexOf('/');
- }
-
-}
diff --git a/cps-service/src/main/java/org/onap/cps/notification/Operation.java b/cps-service/src/main/java/org/onap/cps/notification/Operation.java
deleted file mode 100644
index 83e1ccf79f..0000000000
--- a/cps-service/src/main/java/org/onap/cps/notification/Operation.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (c) 2022 Bell Canada.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.notification;
-
-public enum Operation {
- CREATE,
- UPDATE,
- DELETE
-}
diff --git a/cps-service/src/test/groovy/org/onap/cps/api/impl/CpsDataServiceImplSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/api/impl/CpsDataServiceImplSpec.groovy
index a914598521..6ff708a6ea 100644
--- a/cps-service/src/test/groovy/org/onap/cps/api/impl/CpsDataServiceImplSpec.groovy
+++ b/cps-service/src/test/groovy/org/onap/cps/api/impl/CpsDataServiceImplSpec.groovy
@@ -26,8 +26,6 @@ package org.onap.cps.api.impl
import org.onap.cps.TestUtils
import org.onap.cps.api.CpsAdminService
import org.onap.cps.api.CpsDeltaService
-import org.onap.cps.notification.NotificationService
-import org.onap.cps.notification.Operation
import org.onap.cps.spi.CpsDataPersistenceService
import org.onap.cps.spi.FetchDescendantsOption
import org.onap.cps.spi.exceptions.ConcurrencyException
@@ -38,7 +36,6 @@ import org.onap.cps.spi.exceptions.SessionTimeoutException
import org.onap.cps.spi.model.Anchor
import org.onap.cps.spi.model.DataNode
import org.onap.cps.spi.model.DataNodeBuilder
-import org.onap.cps.spi.model.DeltaReportBuilder
import org.onap.cps.spi.utils.CpsValidator
import org.onap.cps.utils.ContentType
import org.onap.cps.utils.TimedYangParser
@@ -54,13 +51,12 @@ class CpsDataServiceImplSpec extends Specification {
def mockCpsDataPersistenceService = Mock(CpsDataPersistenceService)
def mockCpsAdminService = Mock(CpsAdminService)
def mockYangTextSchemaSourceSetCache = Mock(YangTextSchemaSourceSetCache)
- def mockNotificationService = Mock(NotificationService)
def mockCpsValidator = Mock(CpsValidator)
def timedYangParser = new TimedYangParser()
def mockCpsDeltaService = Mock(CpsDeltaService);
def objectUnderTest = new CpsDataServiceImpl(mockCpsDataPersistenceService, mockCpsAdminService,
- mockYangTextSchemaSourceSetCache, mockNotificationService, mockCpsValidator, timedYangParser, mockCpsDeltaService)
+ mockYangTextSchemaSourceSetCache, mockCpsValidator, timedYangParser, mockCpsDeltaService)
def setup() {
@@ -92,8 +88,6 @@ class CpsDataServiceImplSpec extends Specification {
{ dataNode -> dataNode.xpath[0] == '/test-tree' })
and: 'the CpsValidator is called on the dataspaceName and AnchorName'
1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName)
- and: 'data updated event is sent to notification service'
- 1 * mockNotificationService.processDataUpdatedEvent(anchor, '/', Operation.CREATE, observedTimestamp)
where: 'given parameters'
scenario | dataFile | contentType
'json' | 'test-tree.json' | ContentType.JSON
@@ -115,18 +109,6 @@ class CpsDataServiceImplSpec extends Specification {
'invalid xml' | '<invalid xml' | ContentType.XML || 'Failed to parse xml data'
}
- def 'Saving #scenarioDesired data exception during notification.'() {
- given: 'schema set for given anchor and dataspace references test-tree model'
- setupSchemaSetMocks('test-tree.yang')
- and: 'the notification service throws an exception'
- mockNotificationService.processDataUpdatedEvent(*_) >> { throw new RuntimeException('to be ignored')}
- when: 'save data method is invoked with test-tree json data'
- def data = TestUtils.getResourceFileContent('test-tree.json')
- objectUnderTest.saveData(dataspaceName, anchorName, data, observedTimestamp)
- then: 'the exception is ignored'
- noExceptionThrown()
- }
-
def 'Saving list element data fragment under Root node.'() {
given: 'schema set for given anchor and dataspace references bookstore model'
setupSchemaSetMocks('bookstore.yang')
@@ -145,8 +127,6 @@ class CpsDataServiceImplSpec extends Specification {
)
and: 'the CpsValidator is called on the dataspaceName and AnchorName'
1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName)
- and: 'data updated event is sent to notification service'
- 1 * mockNotificationService.processDataUpdatedEvent(anchor, '/', Operation.UPDATE, observedTimestamp)
}
def 'Saving child data fragment under existing node.'() {
@@ -160,8 +140,6 @@ class CpsDataServiceImplSpec extends Specification {
{ dataNode -> dataNode.xpath[0] == '/test-tree/branch[@name=\'New\']' })
and: 'the CpsValidator is called on the dataspaceName and AnchorName'
1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName)
- and: 'data updated event is sent to notification service'
- 1 * mockNotificationService.processDataUpdatedEvent(anchor, '/test-tree', Operation.CREATE, observedTimestamp)
}
def 'Saving list element data fragment under existing node.'() {
@@ -182,8 +160,6 @@ class CpsDataServiceImplSpec extends Specification {
)
and: 'the CpsValidator is called on the dataspaceName and AnchorName'
1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName)
- and: 'data updated event is sent to notification service'
- 1 * mockNotificationService.processDataUpdatedEvent(anchor, '/test-tree', Operation.UPDATE, observedTimestamp)
}
def 'Saving collection of a batch with data fragment under existing node.'() {
@@ -202,8 +178,6 @@ class CpsDataServiceImplSpec extends Specification {
assert listOfXpaths.containsAll(['/test-tree/branch[@name=\'B\']','/test-tree/branch[@name=\'A\']'])
}
}
- and: 'data updated event is sent to notification service'
- 1 * mockNotificationService.processDataUpdatedEvent(anchor, '/test-tree', Operation.UPDATE, observedTimestamp)
}
def 'Saving empty list element data fragment.'() {
@@ -266,8 +240,6 @@ class CpsDataServiceImplSpec extends Specification {
1 * mockCpsDataPersistenceService.batchUpdateDataLeaves(dataspaceName, anchorName, {dataNode -> dataNode.keySet()[0] == expectedNodeXpath})
and: 'the CpsValidator is called on the dataspaceName and AnchorName'
1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName)
- and: 'data updated event is sent to notification service'
- 1 * mockNotificationService.processDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp)
where: 'following parameters were used'
scenario | parentNodeXpath | jsonData || expectedNodeXpath
'top level node' | '/' | '{"test-tree": {"branch": []}}' || '/test-tree'
@@ -300,8 +272,6 @@ class CpsDataServiceImplSpec extends Specification {
1 * mockCpsDataPersistenceService.batchUpdateDataLeaves(dataspaceName, anchorName, {dataNode -> dataNode.keySet()[index] == expectedNodeXpath})
and: 'the CpsValidator is called on the dataspaceName and AnchorName'
1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName)
- and: 'data updated event is sent to notification service'
- 1 * mockNotificationService.processDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp)
where: 'the following parameters were used'
index | expectedNodeXpath
0 | '/first-container'
@@ -325,8 +295,6 @@ class CpsDataServiceImplSpec extends Specification {
.iterator().next() == "/bookstore/categories[@code='01']/books[@title='new']"})
and: 'the CpsValidator is called on the dataspaceName and AnchorName'
1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName)
- and: 'the data updated event is sent to the notification service'
- 1 * mockNotificationService.processDataUpdatedEvent(anchor, '/bookstore', Operation.UPDATE, observedTimestamp)
}
def 'Replace data node using singular data node: #scenario.'() {
@@ -337,8 +305,6 @@ class CpsDataServiceImplSpec extends Specification {
then: 'the persistence service method is invoked with correct parameters'
1 * mockCpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName,
{ dataNode -> dataNode.xpath == expectedNodeXpath})
- and: 'data updated event is sent to notification service'
- 1 * mockNotificationService.processDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp)
and: 'the CpsValidator is called on the dataspaceName and AnchorName'
1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName)
where: 'following parameters were used'
@@ -356,10 +322,6 @@ class CpsDataServiceImplSpec extends Specification {
then: 'the persistence service method is invoked with correct parameters'
1 * mockCpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName,
{ dataNode -> dataNode.xpath == expectedNodeXpath})
- and: 'data updated event is sent to notification service'
- nodesJsonData.keySet().each {
- 1 * mockNotificationService.processDataUpdatedEvent(anchor, it, Operation.UPDATE, observedTimestamp)
- }
and: 'the CpsValidator is called on the dataspaceName and AnchorName'
1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName)
where: 'following parameters were used'
@@ -399,8 +361,6 @@ class CpsDataServiceImplSpec extends Specification {
)
and: 'the CpsValidator is called on the dataspaceName and AnchorName twice'
2 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName)
- and: 'data updated event is sent to notification service'
- 1 * mockNotificationService.processDataUpdatedEvent(anchor, '/test-tree', Operation.UPDATE, observedTimestamp)
}
def 'Replace whole list content with empty list element.'() {
@@ -420,8 +380,6 @@ class CpsDataServiceImplSpec extends Specification {
1 * mockCpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, '/test-tree/branch')
and: 'the CpsValidator is called on the dataspaceName and AnchorName'
1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName)
- and: 'data updated event is sent to notification service'
- 1 * mockNotificationService.processDataUpdatedEvent(anchor, '/test-tree/branch', Operation.DELETE, observedTimestamp)
}
def 'Delete multiple list elements under existing node.'() {
@@ -431,8 +389,6 @@ class CpsDataServiceImplSpec extends Specification {
1 * mockCpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, ['/test-tree/branch[@name="A"]', '/test-tree/branch[@name="B"]'])
and: 'the CpsValidator is called on the dataspaceName and AnchorName'
1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName)
- and: 'two data updated events are sent to notification service'
- 2 * mockNotificationService.processDataUpdatedEvent(anchor, _, Operation.DELETE, observedTimestamp)
}
def 'Delete data node under anchor and dataspace.'() {
@@ -442,16 +398,12 @@ class CpsDataServiceImplSpec extends Specification {
1 * mockCpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, '/data-node')
and: 'the CpsValidator is called on the dataspaceName and AnchorName'
1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName)
- and: 'data updated event is sent to notification service'
- 1 * mockNotificationService.processDataUpdatedEvent(anchor, '/data-node', Operation.DELETE, observedTimestamp)
}
def 'Delete all data nodes for a given anchor and dataspace.'() {
when: 'delete data nodes method is invoked with correct parameters'
objectUnderTest.deleteDataNodes(dataspaceName, anchorName, observedTimestamp)
- then: 'data updated event is sent to notification service before the delete'
- 1 * mockNotificationService.processDataUpdatedEvent(anchor, '/', Operation.DELETE, observedTimestamp)
- and: 'the CpsValidator is called on the dataspaceName and AnchorName'
+ then: 'the CpsValidator is called on the dataspaceName and AnchorName'
1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName)
and: 'the persistence service method is invoked with the correct parameters'
1 * mockCpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName)
@@ -479,9 +431,7 @@ class CpsDataServiceImplSpec extends Specification {
new Anchor(name: 'anchor2', dataspaceName: dataspaceName)]
when: 'delete data node method is invoked with correct parameters'
objectUnderTest.deleteDataNodes(dataspaceName, ['anchor1', 'anchor2'], observedTimestamp)
- then: 'data updated events are sent to notification service before the delete'
- 2 * mockNotificationService.processDataUpdatedEvent(_, '/', Operation.DELETE, observedTimestamp)
- and: 'the CpsValidator is called on the dataspace name and the anchor names'
+ then: 'the CpsValidator is called on the dataspace name and the anchor names'
2 * mockCpsValidator.validateNameCharacters(_)
and: 'the persistence service method is invoked with the correct parameters'
1 * mockCpsDataPersistenceService.deleteDataNodes(dataspaceName, _ as Collection<String>)
diff --git a/cps-service/src/test/groovy/org/onap/cps/api/impl/E2ENetworkSliceSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/api/impl/E2ENetworkSliceSpec.groovy
index 1b873ec12b..118ee1cd02 100755
--- a/cps-service/src/test/groovy/org/onap/cps/api/impl/E2ENetworkSliceSpec.groovy
+++ b/cps-service/src/test/groovy/org/onap/cps/api/impl/E2ENetworkSliceSpec.groovy
@@ -26,7 +26,7 @@ package org.onap.cps.api.impl
import org.onap.cps.TestUtils
import org.onap.cps.api.CpsAdminService
import org.onap.cps.api.CpsDeltaService
-import org.onap.cps.notification.NotificationService
+import org.onap.cps.spi.CpsDataPersistenceService
import org.onap.cps.spi.CpsDataPersistenceService
import org.onap.cps.spi.CpsModulePersistenceService
import org.onap.cps.spi.model.Anchor
@@ -41,7 +41,6 @@ class E2ENetworkSliceSpec extends Specification {
def mockModuleStoreService = Mock(CpsModulePersistenceService)
def mockDataStoreService = Mock(CpsDataPersistenceService)
def mockCpsAdminService = Mock(CpsAdminService)
- def mockNotificationService = Mock(NotificationService)
def mockYangTextSchemaSourceSetCache = Mock(YangTextSchemaSourceSetCache)
def mockCpsValidator = Mock(CpsValidator)
def timedYangTextSchemaSourceSetBuilder = new TimedYangTextSchemaSourceSetBuilder()
@@ -52,7 +51,7 @@ class E2ENetworkSliceSpec extends Specification {
mockYangTextSchemaSourceSetCache, mockCpsAdminService, mockCpsValidator,timedYangTextSchemaSourceSetBuilder)
def cpsDataServiceImpl = new CpsDataServiceImpl(mockDataStoreService, mockCpsAdminService,
- mockYangTextSchemaSourceSetCache, mockNotificationService, mockCpsValidator, timedYangParser, mockCpsDeltaService)
+ mockYangTextSchemaSourceSetCache, mockCpsValidator, timedYangParser, mockCpsDeltaService)
def dataspaceName = 'someDataspace'
def anchorName = 'someAnchor'
diff --git a/cps-service/src/test/groovy/org/onap/cps/config/AsyncConfigSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/config/AsyncConfigSpec.groovy
new file mode 100644
index 0000000000..9f4e81aabc
--- /dev/null
+++ b/cps-service/src/test/groovy/org/onap/cps/config/AsyncConfigSpec.groovy
@@ -0,0 +1,46 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.config
+
+import spock.lang.Specification
+
+class AsyncConfigSpec extends Specification {
+
+ def objectUnderTest = new AsyncConfig()
+
+ def 'Create Async Config and validate it'() {
+ when: 'we set some test properties to tune taskexecutor'
+ objectUnderTest.setCorePoolSize(5)
+ objectUnderTest.setMaxPoolSize(50)
+ objectUnderTest.setQueueCapacity(100)
+ objectUnderTest.setThreadNamePrefix('Test-')
+ objectUnderTest.setWaitForTasksToCompleteOnShutdown(true)
+ then: 'we can instantiate a Async Config object'
+ assert objectUnderTest != null
+ and: 'taskexector is configured with correct properties'
+ def tasExecutor = objectUnderTest.getThreadAsyncExecutorForNotification()
+ assert tasExecutor.properties['corePoolSize'] == 5
+ assert tasExecutor.properties['maxPoolSize'] == 50
+ assert tasExecutor.properties['queueCapacity'] == 100
+ assert tasExecutor.properties['keepAliveSeconds'] == 60
+ assert tasExecutor.properties['threadNamePrefix'] == 'Test-'
+ }
+}
diff --git a/cps-service/src/test/groovy/org/onap/cps/notification/CpsDataUpdatedEventFactorySpec.groovy b/cps-service/src/test/groovy/org/onap/cps/notification/CpsDataUpdatedEventFactorySpec.groovy
deleted file mode 100644
index 49f4bf3850..0000000000
--- a/cps-service/src/test/groovy/org/onap/cps/notification/CpsDataUpdatedEventFactorySpec.groovy
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (c) 2021-2022 Bell Canada.
- * Modifications Copyright (c) 2022-2023 Nordix Foundation
- * Modifications Copyright (C) 2023 TechMahindra Ltd.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.notification
-
-import org.onap.cps.spi.model.DataNode
-
-import java.time.OffsetDateTime
-import java.time.format.DateTimeFormatter
-import org.onap.cps.utils.DateTimeUtility
-import org.onap.cps.utils.PrefixResolver
-import org.onap.cps.api.CpsDataService
-import org.onap.cps.event.model.Content
-import org.onap.cps.event.model.Data
-import org.onap.cps.spi.FetchDescendantsOption
-import org.onap.cps.spi.model.Anchor
-import org.onap.cps.spi.model.DataNodeBuilder
-import org.springframework.util.StringUtils
-import spock.lang.Specification
-
-class CpsDataUpdatedEventFactorySpec extends Specification {
-
- def mockCpsDataService = Mock(CpsDataService)
-
- def mockPrefixResolver = Mock(PrefixResolver)
-
- def objectUnderTest = new CpsDataUpdatedEventFactory(mockCpsDataService, mockPrefixResolver)
-
- def dateTimeFormat = 'yyyy-MM-dd\'T\'HH:mm:ss.SSSZ'
-
- def 'Create a CPS data updated event successfully: #scenario'() {
- given: 'an anchor which has been updated'
- def anchor = new Anchor('my-anchorname', 'my-dataspace', 'my-schemaset-name')
- and: 'cps data service returns the data node details'
- def xpath = '/xpath'
- def dataNode = new DataNodeBuilder().withXpath(xpath).withLeaves(['leafName': 'leafValue']).build()
- mockCpsDataService.getDataNodes(
- 'my-dataspace', 'my-anchorname', '/', FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS) >> [dataNode]
- when: 'CPS data updated event is created'
- def cpsDataUpdatedEvent = objectUnderTest.createCpsDataUpdatedEvent(anchor,
- DateTimeUtility.toOffsetDateTime(inputObservedTimestamp), Operation.CREATE)
- then: 'CPS data updated event is created with correct envelope'
- with(cpsDataUpdatedEvent) {
- type == 'org.onap.cps.data-updated-event'
- source == new URI('urn:cps:org.onap.cps')
- schema == new URI('urn:cps:org.onap.cps:data-updated-event-schema:v1')
- StringUtils.hasText(id)
- content != null
- }
- and: 'correct content'
- with(cpsDataUpdatedEvent.content) {
- assert isExpectedDateTimeFormat(observedTimestamp): "$observedTimestamp is not in $dateTimeFormat format"
- if (inputObservedTimestamp != null)
- assert observedTimestamp == inputObservedTimestamp
- else
- assert OffsetDateTime.now().minusSeconds(20).isBefore(
- DateTimeUtility.toOffsetDateTime(observedTimestamp))
- assert anchorName == 'my-anchorname'
- assert dataspaceName == 'my-dataspace'
- assert schemaSetName == 'my-schemaset-name'
- assert operation == Content.Operation.CREATE
- assert data == new Data().withAdditionalProperty('xpath', ['leafName': 'leafValue'])
- }
- where:
- scenario | inputObservedTimestamp
- 'with observed timestamp -0400' | '2021-01-01T23:00:00.345-0400'
- 'with observed timestamp +0400' | '2021-01-01T23:00:00.345+0400'
- 'missing observed timestamp' | null
- }
-
- def 'Create a delete CPS data updated event successfully'() {
- given: 'an anchor which has been deleted'
- def anchor = new Anchor('my-anchorname', 'my-dataspace', 'my-schemaset-name')
- def deletionTimestamp = '2021-01-01T23:00:00.345-0400'
- when: 'a delete root data node event is created'
- def cpsDataUpdatedEvent = objectUnderTest.createCpsDataUpdatedEvent(anchor,
- DateTimeUtility.toOffsetDateTime(deletionTimestamp), Operation.DELETE)
- then: 'CPS data updated event is created with correct envelope'
- with(cpsDataUpdatedEvent) {
- type == 'org.onap.cps.data-updated-event'
- source == new URI('urn:cps:org.onap.cps')
- schema == new URI('urn:cps:org.onap.cps:data-updated-event-schema:v1')
- StringUtils.hasText(id)
- content != null
- }
- and: 'correct content'
- with(cpsDataUpdatedEvent.content) {
- assert isExpectedDateTimeFormat(observedTimestamp): "$observedTimestamp is not in $dateTimeFormat format"
- assert observedTimestamp == deletionTimestamp
- assert anchorName == 'my-anchorname'
- assert dataspaceName == 'my-dataspace'
- assert schemaSetName == 'my-schemaset-name'
- assert operation == Content.Operation.DELETE
- assert data == null
- }
- }
-
- def 'Create CPS Data Event with URI Syntax Exception'() {
- given: 'an anchor'
- def anchor = new Anchor('my-anchorname', 'my-dataspace', 'my-schemaset-name')
- and: 'a mocked data Node (collection)'
- def mockDataNode = Mock(DataNode)
- mockCpsDataService.getDataNodes(*_) >> [ mockDataNode ]
- and: 'a URI syntax exception is thrown somewhere (using datanode as cannot manipulate hardcoded URIs'
- def originalException = new URISyntaxException('input', 'reason', 0)
- mockDataNode.getXpath() >> { throw originalException }
- when: 'attempt to create data updated event'
- objectUnderTest.createCpsDataUpdatedEvent(anchor, OffsetDateTime.now(), Operation.UPDATE)
- then: 'the same exception is thrown up'
- def thrownUp = thrown(URISyntaxException)
- assert thrownUp == originalException
- }
-
- def isExpectedDateTimeFormat(String observedTimestamp) {
- try {
- DateTimeFormatter.ofPattern(dateTimeFormat).parse(observedTimestamp)
- } catch (DateTimeParseException) {
- return false
- }
- return true
- }
-
-}
diff --git a/cps-service/src/test/groovy/org/onap/cps/notification/KafkaPublisherSpecBase.groovy b/cps-service/src/test/groovy/org/onap/cps/notification/KafkaPublisherSpecBase.groovy
deleted file mode 100644
index b60b38f054..0000000000
--- a/cps-service/src/test/groovy/org/onap/cps/notification/KafkaPublisherSpecBase.groovy
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2021 Bell Canada. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.notification
-
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
-import org.springframework.boot.test.context.SpringBootTest
-import org.springframework.kafka.config.TopicBuilder
-import org.springframework.kafka.core.ConsumerFactory
-import org.springframework.kafka.core.KafkaAdmin
-import org.springframework.kafka.core.KafkaTemplate
-import org.springframework.kafka.listener.ConcurrentMessageListenerContainer
-import org.springframework.kafka.listener.ContainerProperties
-import org.springframework.kafka.listener.MessageListener
-import org.springframework.kafka.test.utils.ContainerTestUtils
-import org.springframework.test.context.ContextConfiguration
-import org.springframework.test.context.DynamicPropertyRegistry
-import org.springframework.test.context.DynamicPropertySource
-import spock.lang.Shared
-import spock.lang.Specification
-
-@ContextConfiguration(classes = [KafkaAutoConfiguration, KafkaProducerListener, NotificationErrorHandler])
-@SpringBootTest
-class KafkaPublisherSpecBase extends Specification {
-
- @Autowired
- KafkaTemplate kafkaTemplate
-
- @Autowired
- KafkaAdmin kafkaAdmin
-
- @Autowired
- ConsumerFactory consumerFactory
-
- @Shared volatile topicCreated = false
- @Shared consumedMessages = new ArrayList<>()
-
- def cpsEventTopic = 'cps-events'
-
- @DynamicPropertySource
- static void registerKafkaProperties(DynamicPropertyRegistry registry) {
- registry.add("spring.kafka.bootstrap-servers", KafkaTestContainerConfig::getBootstrapServers)
- }
-
- def setup() {
- // Kafka listener and topic should be created only once for a test-suite.
- // We are also dependent on sprint context to achieve it, and can not execute it in setupSpec
- if (!topicCreated) {
- kafkaAdmin.createOrModifyTopics(TopicBuilder.name(cpsEventTopic).partitions(1).replicas(1).build())
- startListeningToTopic()
- topicCreated = true
- }
- /* kafka message listener stores the messages to consumedMessages.
- It is important to clear the list before each test case so that test cases can fetch the message from index '0'.
- */
- consumedMessages.clear()
- }
-
- def startListeningToTopic() {
- ContainerProperties containerProperties = new ContainerProperties(cpsEventTopic)
- containerProperties.setMessageListener([
- onMessage: {
- record ->
- consumedMessages.add(record.value())
- }] as MessageListener)
-
- ConcurrentMessageListenerContainer container =
- new ConcurrentMessageListenerContainer<>(
- consumerFactory,
- containerProperties)
-
- container.start()
- ContainerTestUtils.waitForAssignment(container, 1)
- }
-
-}
diff --git a/cps-service/src/test/groovy/org/onap/cps/notification/KafkaTestContainerConfig.groovy b/cps-service/src/test/groovy/org/onap/cps/notification/KafkaTestContainerConfig.groovy
deleted file mode 100644
index b07b31a35b..0000000000
--- a/cps-service/src/test/groovy/org/onap/cps/notification/KafkaTestContainerConfig.groovy
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2021 Bell Canada. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.notification
-
-import org.testcontainers.containers.KafkaContainer
-import org.testcontainers.utility.DockerImageName
-
-class KafkaTestContainerConfig {
-
- private static KafkaContainer kafkaContainer
-
- static {
- getKafkaContainer()
- }
-
- // Not the best performance but it is good enough for test case
- private static synchronized KafkaContainer getKafkaContainer() {
- if (kafkaContainer == null) {
- kafkaContainer = new KafkaContainer(DockerImageName.parse("registry.nordix.org/onaptest/confluentinc/cp-kafka:6.2.1").asCompatibleSubstituteFor("confluentinc/cp-kafka"))
- .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false")
- kafkaContainer.start()
- Runtime.getRuntime().addShutdownHook(new Thread(kafkaContainer::stop))
- }
- return kafkaContainer
- }
-
- static String getBootstrapServers() {
- getKafkaContainer()
- return kafkaContainer.getBootstrapServers()
- }
-
-}
diff --git a/cps-service/src/test/groovy/org/onap/cps/notification/NotificationErrorHandlerSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/notification/NotificationErrorHandlerSpec.groovy
deleted file mode 100644
index 89e305aedb..0000000000
--- a/cps-service/src/test/groovy/org/onap/cps/notification/NotificationErrorHandlerSpec.groovy
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2022-2023 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.notification
-
-import ch.qos.logback.classic.Logger
-import ch.qos.logback.classic.spi.ILoggingEvent
-import ch.qos.logback.core.read.ListAppender
-import org.junit.jupiter.api.AfterEach
-import org.junit.jupiter.api.BeforeEach
-import org.slf4j.LoggerFactory
-
-import spock.lang.Specification
-
-class NotificationErrorHandlerSpec extends Specification{
-
- NotificationErrorHandler objectUnderTest = new NotificationErrorHandler()
- def logWatcher = Spy(ListAppender<ILoggingEvent>)
-
- @BeforeEach
- void setup() {
- ((Logger) LoggerFactory.getLogger(NotificationErrorHandler.class)).addAppender(logWatcher);
- logWatcher.start();
- }
-
- @AfterEach
- void teardown() {
- ((Logger) LoggerFactory.getLogger(NotificationErrorHandler.class)).detachAndStopAllAppenders();
- }
-
- def 'Logging exception via notification error handler #scenario'() {
- when: 'exception #scenario occurs'
- objectUnderTest.onException(exception, 'some context')
- then: 'log output results contains the correct error details'
- def logMessage = logWatcher.list[0].getFormattedMessage()
- assert logMessage.contains('Failed to process')
- assert logMessage.contains("Error cause: ${exptectedCauseString}")
- assert logMessage.contains("Error context: [some context]")
- where:
- scenario | exception || exptectedCauseString
- 'with cause' | new Exception('message') || 'message'
- 'without cause' | new Exception('message', new RuntimeException('cause')) || 'java.lang.RuntimeException: cause'
- }
-}
diff --git a/cps-service/src/test/groovy/org/onap/cps/notification/NotificationPublisherSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/notification/NotificationPublisherSpec.groovy
deleted file mode 100644
index 6cd9ae1b20..0000000000
--- a/cps-service/src/test/groovy/org/onap/cps/notification/NotificationPublisherSpec.groovy
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2021 Bell Canada. All rights reserved.
- * Modifications Copyright (C) 2021-2022 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.notification
-
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.onap.cps.event.model.Content
-import org.onap.cps.event.model.CpsDataUpdatedEvent
-import org.spockframework.spring.SpringBean
-import org.springframework.kafka.KafkaException
-import org.springframework.kafka.core.KafkaTemplate
-import spock.util.concurrent.PollingConditions
-
-class NotificationPublisherSpec extends KafkaPublisherSpecBase {
-
- @SpringBean
- NotificationErrorHandler spyNotificationErrorHandler = Spy(new NotificationErrorHandler())
-
- @SpringBean
- KafkaProducerListener spyKafkaProducerListener = Spy(new KafkaProducerListener<>(spyNotificationErrorHandler))
-
- KafkaTemplate spyKafkaTemplate
- NotificationPublisher objectUnderTest
-
- def myAnchorName = 'my-anchor'
- def myDataspaceName = 'my-dataspace'
-
- def cpsDataUpdatedEvent = new CpsDataUpdatedEvent()
- .withContent(new Content()
- .withDataspaceName(myDataspaceName)
- .withAnchorName(myAnchorName))
-
- def setup() {
- spyKafkaTemplate = Spy(kafkaTemplate)
- objectUnderTest = new NotificationPublisher(spyKafkaTemplate, cpsEventTopic);
- }
-
- def 'Sending event to message bus with correct message Key.'() {
-
- when: 'event is sent to publisher'
- objectUnderTest.sendNotification(cpsDataUpdatedEvent)
- kafkaTemplate.flush()
-
- then: 'event is sent to correct topic with the expected messageKey'
- interaction {
- def messageKey = myDataspaceName + "," + myAnchorName
- 1 * spyKafkaTemplate.send(cpsEventTopic, messageKey, cpsDataUpdatedEvent)
- }
- and: 'received a successful response'
- 1 * spyKafkaProducerListener.onSuccess(_ as ProducerRecord, _)
- and: 'kafka consumer returns expected message'
- def conditions = new PollingConditions(timeout: 60, initialDelay: 0, factor: 1)
- conditions.eventually {
- assert cpsDataUpdatedEvent == consumedMessages.get(0)
- }
- }
-
- def 'Handling of async errors from message bus.'() {
- given: 'topic does not exist'
- objectUnderTest.topicName = 'non-existing-topic'
-
- when: 'message to sent to a non-existing topic'
- objectUnderTest.sendNotification(cpsDataUpdatedEvent)
- kafkaTemplate.flush()
-
- then: 'error is thrown'
- thrown KafkaException
- and: 'error handler is called with exception details'
- 1 * spyKafkaProducerListener.onError(_ as ProducerRecord, _, _ as Exception)
- 1 * spyNotificationErrorHandler.onException(_ as String, _ as Exception,
- _ as ProducerRecord, _)
- }
-
-}
diff --git a/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy
deleted file mode 100644
index f07f89b391..0000000000
--- a/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (c) 2021-2022 Bell Canada.
- * Modifications Copyright (C) 2022-2023 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.notification
-
-import org.onap.cps.api.CpsAdminService
-import org.onap.cps.config.AsyncConfig
-import org.onap.cps.event.model.CpsDataUpdatedEvent
-import org.onap.cps.spi.model.Anchor
-import org.spockframework.spring.SpringBean
-import org.spockframework.spring.SpringSpy
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.boot.context.properties.EnableConfigurationProperties
-import org.springframework.boot.test.context.SpringBootTest
-import org.springframework.test.context.ContextConfiguration
-import spock.lang.Shared
-import spock.lang.Specification
-
-import java.time.OffsetDateTime
-import java.util.concurrent.TimeUnit
-
-@SpringBootTest
-@EnableConfigurationProperties
-@ContextConfiguration(classes = [NotificationProperties, NotificationService, NotificationErrorHandler, AsyncConfig])
-class NotificationServiceSpec extends Specification {
-
- @SpringSpy
- NotificationProperties spyNotificationProperties
- @SpringBean
- NotificationPublisher mockNotificationPublisher = Mock()
- @SpringBean
- CpsDataUpdatedEventFactory mockCpsDataUpdatedEventFactory = Mock()
- @SpringSpy
- NotificationErrorHandler spyNotificationErrorHandler
- @SpringBean
- CpsAdminService mockCpsAdminService = Mock()
-
- @Autowired
- NotificationService objectUnderTest
-
- @Shared
- def dataspaceName = 'my-dataspace-published'
- @Shared
- def anchorName = 'my-anchorname'
- @Shared
- def anchor = new Anchor('my-anchorname', 'my-dataspace-published', 'my-schemaset-name')
- def myObservedTimestamp = OffsetDateTime.now()
-
- def setup() {
- mockCpsAdminService.getAnchor(dataspaceName, anchorName) >> anchor
- }
-
- def 'Skip sending notification when disabled.'() {
- given: 'notification is disabled'
- spyNotificationProperties.isEnabled() >> false
- when: 'dataUpdatedEvent is received'
- objectUnderTest.processDataUpdatedEvent(anchor, '/', Operation.CREATE, myObservedTimestamp)
- then: 'the notification is not sent'
- 0 * mockNotificationPublisher.sendNotification(_)
- }
-
- def 'Send notification when enabled: #scenario.'() {
- given: 'notification is enabled'
- spyNotificationProperties.isEnabled() >> true
- and: 'an anchor is in dataspace where #scenario'
- def anchor = new Anchor('my-anchorname', dataspaceName, 'my-schemaset-name')
- and: 'event factory can create event successfully'
- def cpsDataUpdatedEvent = new CpsDataUpdatedEvent()
- mockCpsDataUpdatedEventFactory.createCpsDataUpdatedEvent(anchor, myObservedTimestamp, Operation.CREATE) >> cpsDataUpdatedEvent
- when: 'dataUpdatedEvent is received'
- def future = objectUnderTest.processDataUpdatedEvent(anchor, '/', Operation.CREATE, myObservedTimestamp)
- and: 'wait for async processing to complete'
- future.get(10, TimeUnit.SECONDS)
- then: 'async process completed successfully'
- future.isDone()
- and: 'notification is sent'
- expectedSendNotificationCount * mockNotificationPublisher.sendNotification(cpsDataUpdatedEvent)
- where:
- scenario | dataspaceName || expectedSendNotificationCount
- 'dataspace name does not match filter' | 'does-not-match-pattern' || 0
- 'dataspace name matches filter' | 'my-dataspace-published' || 1
- }
-
- def '#scenario are changed with xpath #xpath and operation #operation'() {
- given: 'notification is enabled'
- spyNotificationProperties.isEnabled() >> true
- and: 'event factory creates event if operation is #operation'
- def cpsDataUpdatedEvent = new CpsDataUpdatedEvent()
- mockCpsDataUpdatedEventFactory.createCpsDataUpdatedEvent(anchor, myObservedTimestamp, expectedOperationInEvent) >>
- cpsDataUpdatedEvent
- when: 'dataUpdatedEvent is received for #xpath'
- def future = objectUnderTest.processDataUpdatedEvent(anchor, xpath, operation, myObservedTimestamp)
- and: 'wait for async processing to complete'
- future.get(10, TimeUnit.SECONDS)
- then: 'async process completed successfully'
- future.isDone()
- and: 'notification is sent'
- 1 * mockNotificationPublisher.sendNotification(cpsDataUpdatedEvent)
- where:
- scenario | xpath | operation || expectedOperationInEvent
- 'Same event is sent when root nodes' | '' | Operation.CREATE || Operation.CREATE
- 'Same event is sent when root nodes' | '' | Operation.UPDATE || Operation.UPDATE
- 'Same event is sent when root nodes' | '' | Operation.DELETE || Operation.DELETE
- 'Same event is sent when root nodes' | '/' | Operation.CREATE || Operation.CREATE
- 'Same event is sent when root nodes' | '/' | Operation.UPDATE || Operation.UPDATE
- 'Same event is sent when root nodes' | '/' | Operation.DELETE || Operation.DELETE
- 'Same event is sent when container nodes' | '/parent' | Operation.CREATE || Operation.CREATE
- 'Same event is sent when container nodes' | '/parent' | Operation.UPDATE || Operation.UPDATE
- 'Same event is sent when container nodes' | '/parent' | Operation.DELETE || Operation.DELETE
- 'UPDATE event is sent when non root nodes' | '/parent/child' | Operation.CREATE || Operation.UPDATE
- 'UPDATE event is sent when non root nodes' | '/parent/child' | Operation.UPDATE || Operation.UPDATE
- 'UPDATE event is sent when non root nodes' | '/parent/child' | Operation.DELETE || Operation.UPDATE
- }
-
- def 'Error handling in notification service.'() {
- given: 'notification is enabled'
- spyNotificationProperties.isEnabled() >> true
- and: 'event factory can not create event successfully'
- mockCpsDataUpdatedEventFactory.createCpsDataUpdatedEvent(anchor, myObservedTimestamp, Operation.CREATE) >>
- { throw new Exception("Could not create event") }
- when: 'event is sent for processing'
- def future = objectUnderTest.processDataUpdatedEvent(anchor, '/', Operation.CREATE, myObservedTimestamp)
- and: 'wait for async processing to complete'
- future.get(10, TimeUnit.SECONDS)
- then: 'async process completed successfully'
- future.isDone()
- and: 'error is handled and not thrown to caller'
- notThrown Exception
- 1 * spyNotificationErrorHandler.onException(_, _, _, '/', Operation.CREATE)
- }
-
- def 'Disabled Notification services'() {
- given: 'a notification service that is disabled'
- spyNotificationProperties.enabled >> false
- NotificationService notificationService = new NotificationService(spyNotificationProperties, mockNotificationPublisher, mockCpsDataUpdatedEventFactory, spyNotificationErrorHandler, mockCpsAdminService)
- notificationService.init()
- expect: 'it will not send notifications'
- assert notificationService.shouldSendNotification('') == false
- }
-}