diff options
Diffstat (limited to 'cps-service')
17 files changed, 51 insertions, 1149 deletions
diff --git a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java index e74e0ad249..6672d6883f 100755 --- a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java +++ b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java @@ -24,10 +24,6 @@ package org.onap.cps.api.impl; -import static org.onap.cps.notification.Operation.CREATE; -import static org.onap.cps.notification.Operation.DELETE; -import static org.onap.cps.notification.Operation.UPDATE; - import io.micrometer.core.annotation.Timed; import java.io.Serializable; import java.time.OffsetDateTime; @@ -43,8 +39,6 @@ import org.onap.cps.api.CpsAdminService; import org.onap.cps.api.CpsDataService; import org.onap.cps.api.CpsDeltaService; import org.onap.cps.cpspath.parser.CpsPathUtil; -import org.onap.cps.notification.NotificationService; -import org.onap.cps.notification.Operation; import org.onap.cps.spi.CpsDataPersistenceService; import org.onap.cps.spi.FetchDescendantsOption; import org.onap.cps.spi.exceptions.DataValidationException; @@ -70,7 +64,6 @@ public class CpsDataServiceImpl implements CpsDataService { private final CpsDataPersistenceService cpsDataPersistenceService; private final CpsAdminService cpsAdminService; private final YangTextSchemaSourceSetCache yangTextSchemaSourceSetCache; - private final NotificationService notificationService; private final CpsValidator cpsValidator; private final TimedYangParser timedYangParser; private final CpsDeltaService cpsDeltaService; @@ -90,7 +83,6 @@ public class CpsDataServiceImpl implements CpsDataService { final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName); final Collection<DataNode> dataNodes = buildDataNodes(anchor, ROOT_NODE_XPATH, nodeData, contentType); cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes); - processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, CREATE, observedTimestamp); } @Override @@ -109,7 +101,6 @@ public class CpsDataServiceImpl implements CpsDataService { final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName); final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, nodeData, contentType); cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes); - processDataUpdatedEventAsync(anchor, parentNodeXpath, CREATE, observedTimestamp); } @Override @@ -127,7 +118,6 @@ public class CpsDataServiceImpl implements CpsDataService { cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath, listElementDataNodeCollection); } - processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp); } @Override @@ -141,7 +131,6 @@ public class CpsDataServiceImpl implements CpsDataService { buildDataNodes(anchor, parentNodeXpath, jsonDataList, ContentType.JSON); cpsDataPersistenceService.addMultipleLists(dataspaceName, anchorName, parentNodeXpath, listElementDataNodeCollections); - processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp); } @Override @@ -177,7 +166,6 @@ public class CpsDataServiceImpl implements CpsDataService { final Map<String, Map<String, Serializable>> xpathToUpdatedLeaves = dataNodesInPatch.stream() .collect(Collectors.toMap(DataNode::getXpath, DataNode::getLeaves)); cpsDataPersistenceService.batchUpdateDataLeaves(dataspaceName, anchorName, xpathToUpdatedLeaves); - processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp); } @Override @@ -194,7 +182,6 @@ public class CpsDataServiceImpl implements CpsDataService { for (final DataNode dataNodeUpdate : dataNodeUpdates) { processDataNodeUpdate(anchor, dataNodeUpdate); } - processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp); } @Override @@ -244,7 +231,6 @@ public class CpsDataServiceImpl implements CpsDataService { final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName); final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON); cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes); - processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp); } @Override @@ -257,8 +243,6 @@ public class CpsDataServiceImpl implements CpsDataService { final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName); final Collection<DataNode> dataNodes = buildDataNodes(anchor, nodesJsonData); cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes); - nodesJsonData.keySet().forEach(nodeXpath -> - processDataUpdatedEventAsync(anchor, nodeXpath, UPDATE, observedTimestamp)); } @Override @@ -279,9 +263,7 @@ public class CpsDataServiceImpl implements CpsDataService { public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath, final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) { cpsValidator.validateNameCharacters(dataspaceName, anchorName); - final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName); cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes); - processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp); } @Override @@ -290,9 +272,7 @@ public class CpsDataServiceImpl implements CpsDataService { public void deleteDataNode(final String dataspaceName, final String anchorName, final String dataNodeXpath, final OffsetDateTime observedTimestamp) { cpsValidator.validateNameCharacters(dataspaceName, anchorName); - final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName); cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath); - processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp); } @Override @@ -302,9 +282,6 @@ public class CpsDataServiceImpl implements CpsDataService { final Collection<String> dataNodeXpaths, final OffsetDateTime observedTimestamp) { cpsValidator.validateNameCharacters(dataspaceName, anchorName); cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths); - final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName); - dataNodeXpaths.forEach(dataNodeXpath -> - processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp)); } @Override @@ -313,8 +290,6 @@ public class CpsDataServiceImpl implements CpsDataService { public void deleteDataNodes(final String dataspaceName, final String anchorName, final OffsetDateTime observedTimestamp) { cpsValidator.validateNameCharacters(dataspaceName, anchorName); - final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName); - processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp); cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName); } @@ -325,9 +300,6 @@ public class CpsDataServiceImpl implements CpsDataService { final OffsetDateTime observedTimestamp) { cpsValidator.validateNameCharacters(dataspaceName); cpsValidator.validateNameCharacters(anchorNames); - for (final Anchor anchor : cpsAdminService.getAnchors(dataspaceName, anchorNames)) { - processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp); - } cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorNames); } @@ -337,9 +309,7 @@ public class CpsDataServiceImpl implements CpsDataService { public void deleteListOrListElement(final String dataspaceName, final String anchorName, final String listNodeXpath, final OffsetDateTime observedTimestamp) { cpsValidator.validateNameCharacters(dataspaceName, anchorName); - final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName); cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath); - processDataUpdatedEventAsync(anchor, listNodeXpath, DELETE, observedTimestamp); } private Collection<DataNode> buildDataNodes(final Anchor anchor, final Map<String, String> nodesJsonData) { @@ -385,16 +355,6 @@ public class CpsDataServiceImpl implements CpsDataService { .collect(Collectors.toList()); } - private void processDataUpdatedEventAsync(final Anchor anchor, final String xpath, - final Operation operation, final OffsetDateTime observedTimestamp) { - try { - notificationService.processDataUpdatedEvent(anchor, xpath, operation, observedTimestamp); - } catch (final Exception exception) { - //If async message can't be queued for notification service, the initial request should not fail. - log.error("Failed to send message to notification service", exception); - } - } - private SchemaContext getSchemaContext(final Anchor anchor) { return yangTextSchemaSourceSetCache .get(anchor.getDataspaceName(), anchor.getSchemaSetName()).getSchemaContext(); diff --git a/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java b/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java deleted file mode 100644 index 696fd60f8c..0000000000 --- a/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (c) 2021-2022 Bell Canada. - * Modifications Copyright (c) 2022-2023 Nordix Foundation - * Modifications Copyright (C) 2023 TechMahindra Ltd. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.notification; - -import java.net.URI; -import java.net.URISyntaxException; -import java.time.OffsetDateTime; -import java.time.format.DateTimeFormatter; -import java.util.UUID; -import lombok.AllArgsConstructor; -import lombok.SneakyThrows; -import org.onap.cps.api.CpsDataService; -import org.onap.cps.event.model.Content; -import org.onap.cps.event.model.CpsDataUpdatedEvent; -import org.onap.cps.event.model.Data; -import org.onap.cps.spi.FetchDescendantsOption; -import org.onap.cps.spi.model.Anchor; -import org.onap.cps.spi.model.DataNode; -import org.onap.cps.utils.DataMapUtils; -import org.onap.cps.utils.PrefixResolver; -import org.springframework.context.annotation.Lazy; -import org.springframework.stereotype.Component; - -@Component -@AllArgsConstructor(onConstructor = @__(@Lazy)) -public class CpsDataUpdatedEventFactory { - - private static final DateTimeFormatter DATE_TIME_FORMATTER = - DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); - - @Lazy - private final CpsDataService cpsDataService; - - @Lazy - private final PrefixResolver prefixResolver; - - /** - * Generates CPS Data Updated event. If observedTimestamp is not provided, then current timestamp is used. - * - * @param anchor anchor - * @param observedTimestamp observedTimestamp - * @param operation operation - * @return CpsDataUpdatedEvent - */ - public CpsDataUpdatedEvent createCpsDataUpdatedEvent(final Anchor anchor, - final OffsetDateTime observedTimestamp, final Operation operation) { - final var dataNode = (operation == Operation.DELETE) ? null : - cpsDataService.getDataNodes(anchor.getDataspaceName(), anchor.getName(), - "/", FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS).iterator().next(); - return toCpsDataUpdatedEvent(anchor, dataNode, observedTimestamp, operation); - } - - @SneakyThrows(URISyntaxException.class) - private CpsDataUpdatedEvent toCpsDataUpdatedEvent(final Anchor anchor, - final DataNode dataNode, - final OffsetDateTime observedTimestamp, - final Operation operation) { - final CpsDataUpdatedEvent cpsDataUpdatedEvent = new CpsDataUpdatedEvent(); - cpsDataUpdatedEvent.withContent(createContent(anchor, dataNode, observedTimestamp, operation)); - cpsDataUpdatedEvent.withId(UUID.randomUUID().toString()); - cpsDataUpdatedEvent.withSchema(new URI("urn:cps:org.onap.cps:data-updated-event-schema:v1")); - cpsDataUpdatedEvent.withSource(new URI("urn:cps:org.onap.cps")); - cpsDataUpdatedEvent.withType("org.onap.cps.data-updated-event"); - return cpsDataUpdatedEvent; - } - - private Data createData(final DataNode dataNode, final String prefix) { - final Data data = new Data(); - DataMapUtils.toDataMapWithIdentifier(dataNode, prefix).forEach(data::setAdditionalProperty); - return data; - } - - private Content createContent(final Anchor anchor, final DataNode dataNode, - final OffsetDateTime observedTimestamp, final Operation operation) { - final var content = new Content(); - content.withAnchorName(anchor.getName()); - content.withDataspaceName(anchor.getDataspaceName()); - content.withSchemaSetName(anchor.getSchemaSetName()); - content.withOperation(Content.Operation.fromValue(operation.name())); - content.withObservedTimestamp( - DATE_TIME_FORMATTER.format(observedTimestamp == null ? OffsetDateTime.now() : observedTimestamp)); - if (dataNode != null) { - final String prefix = prefixResolver.getPrefix(anchor, dataNode.getXpath()); - content.withData(createData(dataNode, prefix)); - } - return content; - } -} diff --git a/cps-service/src/main/java/org/onap/cps/notification/KafkaProducerListener.java b/cps-service/src/main/java/org/onap/cps/notification/KafkaProducerListener.java deleted file mode 100644 index f4b68c0699..0000000000 --- a/cps-service/src/main/java/org/onap/cps/notification/KafkaProducerListener.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2021 Bell Canada. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.notification; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.springframework.kafka.support.ProducerListener; -import org.springframework.stereotype.Component; - -@Slf4j -@Component -public class KafkaProducerListener<K, V> implements ProducerListener<K, V> { - - private NotificationErrorHandler notificationErrorHandler; - - public KafkaProducerListener(final NotificationErrorHandler notificationErrorHandler) { - this.notificationErrorHandler = notificationErrorHandler; - } - - @Override - public void onSuccess(final ProducerRecord<K, V> producerRecord, final RecordMetadata recordMetadata) { - log.debug("Message sent to event-bus topic :'{}' with body : {} ", producerRecord.topic(), - producerRecord.value()); - } - - @Override - public void onError(final ProducerRecord<K, V> producerRecord, - final RecordMetadata recordMetadata, - final Exception exception) { - notificationErrorHandler.onException("Failed to send message to message bus", - exception, producerRecord, recordMetadata); - } - -} diff --git a/cps-service/src/main/java/org/onap/cps/notification/NotificationErrorHandler.java b/cps-service/src/main/java/org/onap/cps/notification/NotificationErrorHandler.java deleted file mode 100644 index eef028d5f3..0000000000 --- a/cps-service/src/main/java/org/onap/cps/notification/NotificationErrorHandler.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2021 Bell Canada. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.notification; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -@Component -@Slf4j -public class NotificationErrorHandler { - - void onException(final Exception exception, final Object... context) { - onException("Failed to process", exception, context); - } - - void onException(final String message, final Exception exception, final Object... context) { - log.error("{} \n Error cause: {} \n Error context: {}", - message, - exception.getCause() != null ? exception.getCause().toString() : exception.getMessage(), - context, - exception); - } -} diff --git a/cps-service/src/main/java/org/onap/cps/notification/NotificationProperties.java b/cps-service/src/main/java/org/onap/cps/notification/NotificationProperties.java deleted file mode 100644 index b8a7144b3d..0000000000 --- a/cps-service/src/main/java/org/onap/cps/notification/NotificationProperties.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2021 Bell Canada. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.notification; - -import jakarta.validation.constraints.NotNull; -import java.util.Collections; -import java.util.Map; -import lombok.Data; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.stereotype.Component; -import org.springframework.validation.annotation.Validated; - -@ConfigurationProperties(prefix = "notification.data-updated") -@Component -@Data -@Validated -public class NotificationProperties { - - @NotNull - private String topic; - private Map<String, String> filters = Collections.emptyMap(); - - @Value("${notification.enabled:true}") - private boolean enabled; -} diff --git a/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java b/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java deleted file mode 100644 index 2d87488245..0000000000 --- a/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (c) 2021 Bell Canada. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.notification; - -import lombok.extern.slf4j.Slf4j; -import org.checkerframework.checker.nullness.qual.NonNull; -import org.onap.cps.event.model.CpsDataUpdatedEvent; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.stereotype.Component; - -@Component -@Slf4j -public class NotificationPublisher { - - private KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate; - private String topicName; - - /** - * Create an instance of Notification Publisher. - * - * @param kafkaTemplate kafkaTemplate is send event using kafka - * @param topicName topic, to which cpsDataUpdatedEvent is sent, is provided by setting - * 'notification.data-updated.topic' in the application properties - */ - @Autowired - public NotificationPublisher( - final KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate, - final @Value("${notification.data-updated.topic}") String topicName) { - this.kafkaTemplate = kafkaTemplate; - this.topicName = topicName; - } - - /** - * Send event to Kafka with correct message key. - * - * @param cpsDataUpdatedEvent event to be sent to kafka - */ - public void sendNotification(@NonNull final CpsDataUpdatedEvent cpsDataUpdatedEvent) { - final var messageKey = cpsDataUpdatedEvent.getContent().getDataspaceName() + "," - + cpsDataUpdatedEvent.getContent().getAnchorName(); - log.debug("Data Updated event is being sent with messageKey: '{}' & body : {} ", - messageKey, cpsDataUpdatedEvent); - kafkaTemplate.send(topicName, messageKey, cpsDataUpdatedEvent); - } - -} diff --git a/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java b/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java deleted file mode 100644 index c29d042293..0000000000 --- a/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (c) 2021-2022 Bell Canada. - * Modifications Copyright (C) 2022-2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.notification; - -import jakarta.annotation.PostConstruct; -import java.time.OffsetDateTime; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.onap.cps.api.CpsAdminService; -import org.onap.cps.spi.model.Anchor; -import org.springframework.scheduling.annotation.Async; -import org.springframework.stereotype.Service; - -@Service -@Slf4j -@RequiredArgsConstructor -public class NotificationService { - - private final NotificationProperties notificationProperties; - private final NotificationPublisher notificationPublisher; - private final CpsDataUpdatedEventFactory cpsDataUpdatedEventFactory; - private final NotificationErrorHandler notificationErrorHandler; - private final CpsAdminService cpsAdminService; - private List<Pattern> dataspacePatterns; - - @PostConstruct - public void init() { - log.info("Notification Properties {}", notificationProperties); - this.dataspacePatterns = getDataspaceFilterPatterns(notificationProperties); - } - - private List<Pattern> getDataspaceFilterPatterns(final NotificationProperties notificationProperties) { - if (notificationProperties.isEnabled()) { - return Arrays.stream(notificationProperties.getFilters() - .getOrDefault("enabled-dataspaces", "") - .split(",")) - .map(filterPattern -> Pattern.compile(filterPattern, Pattern.CASE_INSENSITIVE)) - .collect(Collectors.toList()); - } else { - return Collections.emptyList(); - } - } - - /** - * Process Data Updated Event and publishes the notification. - * - * @param anchor anchor - * @param xpath xpath of changed data node - * @param operation operation - * @param observedTimestamp observedTimestamp - * @return future - */ - @Async("notificationExecutor") - public Future<Void> processDataUpdatedEvent(final Anchor anchor, final String xpath, final Operation operation, - final OffsetDateTime observedTimestamp) { - - log.debug("process data updated event for anchor '{}'", anchor); - try { - if (shouldSendNotification(anchor.getDataspaceName())) { - final var cpsDataUpdatedEvent = - cpsDataUpdatedEventFactory.createCpsDataUpdatedEvent(anchor, - observedTimestamp, getRootNodeOperation(xpath, operation)); - log.debug("data updated event to be published {}", cpsDataUpdatedEvent); - notificationPublisher.sendNotification(cpsDataUpdatedEvent); - } - } catch (final Exception exception) { - /* All the exceptions are handled to not to propagate it to caller. - CPS operation should not fail if sending event fails for any reason. - */ - notificationErrorHandler.onException("Failed to process cps-data-updated-event.", - exception, anchor, xpath, operation); - } - return CompletableFuture.completedFuture(null); - } - - /* - Add more complex rules based on dataspace and anchor later - */ - private boolean shouldSendNotification(final String dataspaceName) { - - return notificationProperties.isEnabled() - && dataspacePatterns.stream() - .anyMatch(pattern -> pattern.matcher(dataspaceName).find()); - } - - private Operation getRootNodeOperation(final String xpath, final Operation operation) { - return isRootXpath(xpath) || isRootContainerNodeXpath(xpath) ? operation : Operation.UPDATE; - } - - private static boolean isRootXpath(final String xpath) { - return "/".equals(xpath) || "".equals(xpath); - } - - private static boolean isRootContainerNodeXpath(final String xpath) { - return 0 == xpath.lastIndexOf('/'); - } - -} diff --git a/cps-service/src/main/java/org/onap/cps/notification/Operation.java b/cps-service/src/main/java/org/onap/cps/notification/Operation.java deleted file mode 100644 index 83e1ccf79f..0000000000 --- a/cps-service/src/main/java/org/onap/cps/notification/Operation.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (c) 2022 Bell Canada. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.notification; - -public enum Operation { - CREATE, - UPDATE, - DELETE -} diff --git a/cps-service/src/test/groovy/org/onap/cps/api/impl/CpsDataServiceImplSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/api/impl/CpsDataServiceImplSpec.groovy index a914598521..6ff708a6ea 100644 --- a/cps-service/src/test/groovy/org/onap/cps/api/impl/CpsDataServiceImplSpec.groovy +++ b/cps-service/src/test/groovy/org/onap/cps/api/impl/CpsDataServiceImplSpec.groovy @@ -26,8 +26,6 @@ package org.onap.cps.api.impl import org.onap.cps.TestUtils import org.onap.cps.api.CpsAdminService import org.onap.cps.api.CpsDeltaService -import org.onap.cps.notification.NotificationService -import org.onap.cps.notification.Operation import org.onap.cps.spi.CpsDataPersistenceService import org.onap.cps.spi.FetchDescendantsOption import org.onap.cps.spi.exceptions.ConcurrencyException @@ -38,7 +36,6 @@ import org.onap.cps.spi.exceptions.SessionTimeoutException import org.onap.cps.spi.model.Anchor import org.onap.cps.spi.model.DataNode import org.onap.cps.spi.model.DataNodeBuilder -import org.onap.cps.spi.model.DeltaReportBuilder import org.onap.cps.spi.utils.CpsValidator import org.onap.cps.utils.ContentType import org.onap.cps.utils.TimedYangParser @@ -54,13 +51,12 @@ class CpsDataServiceImplSpec extends Specification { def mockCpsDataPersistenceService = Mock(CpsDataPersistenceService) def mockCpsAdminService = Mock(CpsAdminService) def mockYangTextSchemaSourceSetCache = Mock(YangTextSchemaSourceSetCache) - def mockNotificationService = Mock(NotificationService) def mockCpsValidator = Mock(CpsValidator) def timedYangParser = new TimedYangParser() def mockCpsDeltaService = Mock(CpsDeltaService); def objectUnderTest = new CpsDataServiceImpl(mockCpsDataPersistenceService, mockCpsAdminService, - mockYangTextSchemaSourceSetCache, mockNotificationService, mockCpsValidator, timedYangParser, mockCpsDeltaService) + mockYangTextSchemaSourceSetCache, mockCpsValidator, timedYangParser, mockCpsDeltaService) def setup() { @@ -92,8 +88,6 @@ class CpsDataServiceImplSpec extends Specification { { dataNode -> dataNode.xpath[0] == '/test-tree' }) and: 'the CpsValidator is called on the dataspaceName and AnchorName' 1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName) - and: 'data updated event is sent to notification service' - 1 * mockNotificationService.processDataUpdatedEvent(anchor, '/', Operation.CREATE, observedTimestamp) where: 'given parameters' scenario | dataFile | contentType 'json' | 'test-tree.json' | ContentType.JSON @@ -115,18 +109,6 @@ class CpsDataServiceImplSpec extends Specification { 'invalid xml' | '<invalid xml' | ContentType.XML || 'Failed to parse xml data' } - def 'Saving #scenarioDesired data exception during notification.'() { - given: 'schema set for given anchor and dataspace references test-tree model' - setupSchemaSetMocks('test-tree.yang') - and: 'the notification service throws an exception' - mockNotificationService.processDataUpdatedEvent(*_) >> { throw new RuntimeException('to be ignored')} - when: 'save data method is invoked with test-tree json data' - def data = TestUtils.getResourceFileContent('test-tree.json') - objectUnderTest.saveData(dataspaceName, anchorName, data, observedTimestamp) - then: 'the exception is ignored' - noExceptionThrown() - } - def 'Saving list element data fragment under Root node.'() { given: 'schema set for given anchor and dataspace references bookstore model' setupSchemaSetMocks('bookstore.yang') @@ -145,8 +127,6 @@ class CpsDataServiceImplSpec extends Specification { ) and: 'the CpsValidator is called on the dataspaceName and AnchorName' 1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName) - and: 'data updated event is sent to notification service' - 1 * mockNotificationService.processDataUpdatedEvent(anchor, '/', Operation.UPDATE, observedTimestamp) } def 'Saving child data fragment under existing node.'() { @@ -160,8 +140,6 @@ class CpsDataServiceImplSpec extends Specification { { dataNode -> dataNode.xpath[0] == '/test-tree/branch[@name=\'New\']' }) and: 'the CpsValidator is called on the dataspaceName and AnchorName' 1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName) - and: 'data updated event is sent to notification service' - 1 * mockNotificationService.processDataUpdatedEvent(anchor, '/test-tree', Operation.CREATE, observedTimestamp) } def 'Saving list element data fragment under existing node.'() { @@ -182,8 +160,6 @@ class CpsDataServiceImplSpec extends Specification { ) and: 'the CpsValidator is called on the dataspaceName and AnchorName' 1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName) - and: 'data updated event is sent to notification service' - 1 * mockNotificationService.processDataUpdatedEvent(anchor, '/test-tree', Operation.UPDATE, observedTimestamp) } def 'Saving collection of a batch with data fragment under existing node.'() { @@ -202,8 +178,6 @@ class CpsDataServiceImplSpec extends Specification { assert listOfXpaths.containsAll(['/test-tree/branch[@name=\'B\']','/test-tree/branch[@name=\'A\']']) } } - and: 'data updated event is sent to notification service' - 1 * mockNotificationService.processDataUpdatedEvent(anchor, '/test-tree', Operation.UPDATE, observedTimestamp) } def 'Saving empty list element data fragment.'() { @@ -266,8 +240,6 @@ class CpsDataServiceImplSpec extends Specification { 1 * mockCpsDataPersistenceService.batchUpdateDataLeaves(dataspaceName, anchorName, {dataNode -> dataNode.keySet()[0] == expectedNodeXpath}) and: 'the CpsValidator is called on the dataspaceName and AnchorName' 1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName) - and: 'data updated event is sent to notification service' - 1 * mockNotificationService.processDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp) where: 'following parameters were used' scenario | parentNodeXpath | jsonData || expectedNodeXpath 'top level node' | '/' | '{"test-tree": {"branch": []}}' || '/test-tree' @@ -300,8 +272,6 @@ class CpsDataServiceImplSpec extends Specification { 1 * mockCpsDataPersistenceService.batchUpdateDataLeaves(dataspaceName, anchorName, {dataNode -> dataNode.keySet()[index] == expectedNodeXpath}) and: 'the CpsValidator is called on the dataspaceName and AnchorName' 1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName) - and: 'data updated event is sent to notification service' - 1 * mockNotificationService.processDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp) where: 'the following parameters were used' index | expectedNodeXpath 0 | '/first-container' @@ -325,8 +295,6 @@ class CpsDataServiceImplSpec extends Specification { .iterator().next() == "/bookstore/categories[@code='01']/books[@title='new']"}) and: 'the CpsValidator is called on the dataspaceName and AnchorName' 1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName) - and: 'the data updated event is sent to the notification service' - 1 * mockNotificationService.processDataUpdatedEvent(anchor, '/bookstore', Operation.UPDATE, observedTimestamp) } def 'Replace data node using singular data node: #scenario.'() { @@ -337,8 +305,6 @@ class CpsDataServiceImplSpec extends Specification { then: 'the persistence service method is invoked with correct parameters' 1 * mockCpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, { dataNode -> dataNode.xpath == expectedNodeXpath}) - and: 'data updated event is sent to notification service' - 1 * mockNotificationService.processDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp) and: 'the CpsValidator is called on the dataspaceName and AnchorName' 1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName) where: 'following parameters were used' @@ -356,10 +322,6 @@ class CpsDataServiceImplSpec extends Specification { then: 'the persistence service method is invoked with correct parameters' 1 * mockCpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, { dataNode -> dataNode.xpath == expectedNodeXpath}) - and: 'data updated event is sent to notification service' - nodesJsonData.keySet().each { - 1 * mockNotificationService.processDataUpdatedEvent(anchor, it, Operation.UPDATE, observedTimestamp) - } and: 'the CpsValidator is called on the dataspaceName and AnchorName' 1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName) where: 'following parameters were used' @@ -399,8 +361,6 @@ class CpsDataServiceImplSpec extends Specification { ) and: 'the CpsValidator is called on the dataspaceName and AnchorName twice' 2 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName) - and: 'data updated event is sent to notification service' - 1 * mockNotificationService.processDataUpdatedEvent(anchor, '/test-tree', Operation.UPDATE, observedTimestamp) } def 'Replace whole list content with empty list element.'() { @@ -420,8 +380,6 @@ class CpsDataServiceImplSpec extends Specification { 1 * mockCpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, '/test-tree/branch') and: 'the CpsValidator is called on the dataspaceName and AnchorName' 1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName) - and: 'data updated event is sent to notification service' - 1 * mockNotificationService.processDataUpdatedEvent(anchor, '/test-tree/branch', Operation.DELETE, observedTimestamp) } def 'Delete multiple list elements under existing node.'() { @@ -431,8 +389,6 @@ class CpsDataServiceImplSpec extends Specification { 1 * mockCpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, ['/test-tree/branch[@name="A"]', '/test-tree/branch[@name="B"]']) and: 'the CpsValidator is called on the dataspaceName and AnchorName' 1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName) - and: 'two data updated events are sent to notification service' - 2 * mockNotificationService.processDataUpdatedEvent(anchor, _, Operation.DELETE, observedTimestamp) } def 'Delete data node under anchor and dataspace.'() { @@ -442,16 +398,12 @@ class CpsDataServiceImplSpec extends Specification { 1 * mockCpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, '/data-node') and: 'the CpsValidator is called on the dataspaceName and AnchorName' 1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName) - and: 'data updated event is sent to notification service' - 1 * mockNotificationService.processDataUpdatedEvent(anchor, '/data-node', Operation.DELETE, observedTimestamp) } def 'Delete all data nodes for a given anchor and dataspace.'() { when: 'delete data nodes method is invoked with correct parameters' objectUnderTest.deleteDataNodes(dataspaceName, anchorName, observedTimestamp) - then: 'data updated event is sent to notification service before the delete' - 1 * mockNotificationService.processDataUpdatedEvent(anchor, '/', Operation.DELETE, observedTimestamp) - and: 'the CpsValidator is called on the dataspaceName and AnchorName' + then: 'the CpsValidator is called on the dataspaceName and AnchorName' 1 * mockCpsValidator.validateNameCharacters(dataspaceName, anchorName) and: 'the persistence service method is invoked with the correct parameters' 1 * mockCpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName) @@ -479,9 +431,7 @@ class CpsDataServiceImplSpec extends Specification { new Anchor(name: 'anchor2', dataspaceName: dataspaceName)] when: 'delete data node method is invoked with correct parameters' objectUnderTest.deleteDataNodes(dataspaceName, ['anchor1', 'anchor2'], observedTimestamp) - then: 'data updated events are sent to notification service before the delete' - 2 * mockNotificationService.processDataUpdatedEvent(_, '/', Operation.DELETE, observedTimestamp) - and: 'the CpsValidator is called on the dataspace name and the anchor names' + then: 'the CpsValidator is called on the dataspace name and the anchor names' 2 * mockCpsValidator.validateNameCharacters(_) and: 'the persistence service method is invoked with the correct parameters' 1 * mockCpsDataPersistenceService.deleteDataNodes(dataspaceName, _ as Collection<String>) diff --git a/cps-service/src/test/groovy/org/onap/cps/api/impl/E2ENetworkSliceSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/api/impl/E2ENetworkSliceSpec.groovy index 1b873ec12b..118ee1cd02 100755 --- a/cps-service/src/test/groovy/org/onap/cps/api/impl/E2ENetworkSliceSpec.groovy +++ b/cps-service/src/test/groovy/org/onap/cps/api/impl/E2ENetworkSliceSpec.groovy @@ -26,7 +26,7 @@ package org.onap.cps.api.impl import org.onap.cps.TestUtils
import org.onap.cps.api.CpsAdminService
import org.onap.cps.api.CpsDeltaService
-import org.onap.cps.notification.NotificationService
+import org.onap.cps.spi.CpsDataPersistenceService
import org.onap.cps.spi.CpsDataPersistenceService
import org.onap.cps.spi.CpsModulePersistenceService
import org.onap.cps.spi.model.Anchor
@@ -41,7 +41,6 @@ class E2ENetworkSliceSpec extends Specification { def mockModuleStoreService = Mock(CpsModulePersistenceService)
def mockDataStoreService = Mock(CpsDataPersistenceService)
def mockCpsAdminService = Mock(CpsAdminService)
- def mockNotificationService = Mock(NotificationService)
def mockYangTextSchemaSourceSetCache = Mock(YangTextSchemaSourceSetCache)
def mockCpsValidator = Mock(CpsValidator)
def timedYangTextSchemaSourceSetBuilder = new TimedYangTextSchemaSourceSetBuilder()
@@ -52,7 +51,7 @@ class E2ENetworkSliceSpec extends Specification { mockYangTextSchemaSourceSetCache, mockCpsAdminService, mockCpsValidator,timedYangTextSchemaSourceSetBuilder)
def cpsDataServiceImpl = new CpsDataServiceImpl(mockDataStoreService, mockCpsAdminService,
- mockYangTextSchemaSourceSetCache, mockNotificationService, mockCpsValidator, timedYangParser, mockCpsDeltaService)
+ mockYangTextSchemaSourceSetCache, mockCpsValidator, timedYangParser, mockCpsDeltaService)
def dataspaceName = 'someDataspace'
def anchorName = 'someAnchor'
diff --git a/cps-service/src/test/groovy/org/onap/cps/config/AsyncConfigSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/config/AsyncConfigSpec.groovy new file mode 100644 index 0000000000..9f4e81aabc --- /dev/null +++ b/cps-service/src/test/groovy/org/onap/cps/config/AsyncConfigSpec.groovy @@ -0,0 +1,46 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.config + +import spock.lang.Specification + +class AsyncConfigSpec extends Specification { + + def objectUnderTest = new AsyncConfig() + + def 'Create Async Config and validate it'() { + when: 'we set some test properties to tune taskexecutor' + objectUnderTest.setCorePoolSize(5) + objectUnderTest.setMaxPoolSize(50) + objectUnderTest.setQueueCapacity(100) + objectUnderTest.setThreadNamePrefix('Test-') + objectUnderTest.setWaitForTasksToCompleteOnShutdown(true) + then: 'we can instantiate a Async Config object' + assert objectUnderTest != null + and: 'taskexector is configured with correct properties' + def tasExecutor = objectUnderTest.getThreadAsyncExecutorForNotification() + assert tasExecutor.properties['corePoolSize'] == 5 + assert tasExecutor.properties['maxPoolSize'] == 50 + assert tasExecutor.properties['queueCapacity'] == 100 + assert tasExecutor.properties['keepAliveSeconds'] == 60 + assert tasExecutor.properties['threadNamePrefix'] == 'Test-' + } +} diff --git a/cps-service/src/test/groovy/org/onap/cps/notification/CpsDataUpdatedEventFactorySpec.groovy b/cps-service/src/test/groovy/org/onap/cps/notification/CpsDataUpdatedEventFactorySpec.groovy deleted file mode 100644 index 49f4bf3850..0000000000 --- a/cps-service/src/test/groovy/org/onap/cps/notification/CpsDataUpdatedEventFactorySpec.groovy +++ /dev/null @@ -1,142 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (c) 2021-2022 Bell Canada. - * Modifications Copyright (c) 2022-2023 Nordix Foundation - * Modifications Copyright (C) 2023 TechMahindra Ltd. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.notification - -import org.onap.cps.spi.model.DataNode - -import java.time.OffsetDateTime -import java.time.format.DateTimeFormatter -import org.onap.cps.utils.DateTimeUtility -import org.onap.cps.utils.PrefixResolver -import org.onap.cps.api.CpsDataService -import org.onap.cps.event.model.Content -import org.onap.cps.event.model.Data -import org.onap.cps.spi.FetchDescendantsOption -import org.onap.cps.spi.model.Anchor -import org.onap.cps.spi.model.DataNodeBuilder -import org.springframework.util.StringUtils -import spock.lang.Specification - -class CpsDataUpdatedEventFactorySpec extends Specification { - - def mockCpsDataService = Mock(CpsDataService) - - def mockPrefixResolver = Mock(PrefixResolver) - - def objectUnderTest = new CpsDataUpdatedEventFactory(mockCpsDataService, mockPrefixResolver) - - def dateTimeFormat = 'yyyy-MM-dd\'T\'HH:mm:ss.SSSZ' - - def 'Create a CPS data updated event successfully: #scenario'() { - given: 'an anchor which has been updated' - def anchor = new Anchor('my-anchorname', 'my-dataspace', 'my-schemaset-name') - and: 'cps data service returns the data node details' - def xpath = '/xpath' - def dataNode = new DataNodeBuilder().withXpath(xpath).withLeaves(['leafName': 'leafValue']).build() - mockCpsDataService.getDataNodes( - 'my-dataspace', 'my-anchorname', '/', FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS) >> [dataNode] - when: 'CPS data updated event is created' - def cpsDataUpdatedEvent = objectUnderTest.createCpsDataUpdatedEvent(anchor, - DateTimeUtility.toOffsetDateTime(inputObservedTimestamp), Operation.CREATE) - then: 'CPS data updated event is created with correct envelope' - with(cpsDataUpdatedEvent) { - type == 'org.onap.cps.data-updated-event' - source == new URI('urn:cps:org.onap.cps') - schema == new URI('urn:cps:org.onap.cps:data-updated-event-schema:v1') - StringUtils.hasText(id) - content != null - } - and: 'correct content' - with(cpsDataUpdatedEvent.content) { - assert isExpectedDateTimeFormat(observedTimestamp): "$observedTimestamp is not in $dateTimeFormat format" - if (inputObservedTimestamp != null) - assert observedTimestamp == inputObservedTimestamp - else - assert OffsetDateTime.now().minusSeconds(20).isBefore( - DateTimeUtility.toOffsetDateTime(observedTimestamp)) - assert anchorName == 'my-anchorname' - assert dataspaceName == 'my-dataspace' - assert schemaSetName == 'my-schemaset-name' - assert operation == Content.Operation.CREATE - assert data == new Data().withAdditionalProperty('xpath', ['leafName': 'leafValue']) - } - where: - scenario | inputObservedTimestamp - 'with observed timestamp -0400' | '2021-01-01T23:00:00.345-0400' - 'with observed timestamp +0400' | '2021-01-01T23:00:00.345+0400' - 'missing observed timestamp' | null - } - - def 'Create a delete CPS data updated event successfully'() { - given: 'an anchor which has been deleted' - def anchor = new Anchor('my-anchorname', 'my-dataspace', 'my-schemaset-name') - def deletionTimestamp = '2021-01-01T23:00:00.345-0400' - when: 'a delete root data node event is created' - def cpsDataUpdatedEvent = objectUnderTest.createCpsDataUpdatedEvent(anchor, - DateTimeUtility.toOffsetDateTime(deletionTimestamp), Operation.DELETE) - then: 'CPS data updated event is created with correct envelope' - with(cpsDataUpdatedEvent) { - type == 'org.onap.cps.data-updated-event' - source == new URI('urn:cps:org.onap.cps') - schema == new URI('urn:cps:org.onap.cps:data-updated-event-schema:v1') - StringUtils.hasText(id) - content != null - } - and: 'correct content' - with(cpsDataUpdatedEvent.content) { - assert isExpectedDateTimeFormat(observedTimestamp): "$observedTimestamp is not in $dateTimeFormat format" - assert observedTimestamp == deletionTimestamp - assert anchorName == 'my-anchorname' - assert dataspaceName == 'my-dataspace' - assert schemaSetName == 'my-schemaset-name' - assert operation == Content.Operation.DELETE - assert data == null - } - } - - def 'Create CPS Data Event with URI Syntax Exception'() { - given: 'an anchor' - def anchor = new Anchor('my-anchorname', 'my-dataspace', 'my-schemaset-name') - and: 'a mocked data Node (collection)' - def mockDataNode = Mock(DataNode) - mockCpsDataService.getDataNodes(*_) >> [ mockDataNode ] - and: 'a URI syntax exception is thrown somewhere (using datanode as cannot manipulate hardcoded URIs' - def originalException = new URISyntaxException('input', 'reason', 0) - mockDataNode.getXpath() >> { throw originalException } - when: 'attempt to create data updated event' - objectUnderTest.createCpsDataUpdatedEvent(anchor, OffsetDateTime.now(), Operation.UPDATE) - then: 'the same exception is thrown up' - def thrownUp = thrown(URISyntaxException) - assert thrownUp == originalException - } - - def isExpectedDateTimeFormat(String observedTimestamp) { - try { - DateTimeFormatter.ofPattern(dateTimeFormat).parse(observedTimestamp) - } catch (DateTimeParseException) { - return false - } - return true - } - -} diff --git a/cps-service/src/test/groovy/org/onap/cps/notification/KafkaPublisherSpecBase.groovy b/cps-service/src/test/groovy/org/onap/cps/notification/KafkaPublisherSpecBase.groovy deleted file mode 100644 index b60b38f054..0000000000 --- a/cps-service/src/test/groovy/org/onap/cps/notification/KafkaPublisherSpecBase.groovy +++ /dev/null @@ -1,93 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2021 Bell Canada. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.notification - -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration -import org.springframework.boot.test.context.SpringBootTest -import org.springframework.kafka.config.TopicBuilder -import org.springframework.kafka.core.ConsumerFactory -import org.springframework.kafka.core.KafkaAdmin -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.listener.ConcurrentMessageListenerContainer -import org.springframework.kafka.listener.ContainerProperties -import org.springframework.kafka.listener.MessageListener -import org.springframework.kafka.test.utils.ContainerTestUtils -import org.springframework.test.context.ContextConfiguration -import org.springframework.test.context.DynamicPropertyRegistry -import org.springframework.test.context.DynamicPropertySource -import spock.lang.Shared -import spock.lang.Specification - -@ContextConfiguration(classes = [KafkaAutoConfiguration, KafkaProducerListener, NotificationErrorHandler]) -@SpringBootTest -class KafkaPublisherSpecBase extends Specification { - - @Autowired - KafkaTemplate kafkaTemplate - - @Autowired - KafkaAdmin kafkaAdmin - - @Autowired - ConsumerFactory consumerFactory - - @Shared volatile topicCreated = false - @Shared consumedMessages = new ArrayList<>() - - def cpsEventTopic = 'cps-events' - - @DynamicPropertySource - static void registerKafkaProperties(DynamicPropertyRegistry registry) { - registry.add("spring.kafka.bootstrap-servers", KafkaTestContainerConfig::getBootstrapServers) - } - - def setup() { - // Kafka listener and topic should be created only once for a test-suite. - // We are also dependent on sprint context to achieve it, and can not execute it in setupSpec - if (!topicCreated) { - kafkaAdmin.createOrModifyTopics(TopicBuilder.name(cpsEventTopic).partitions(1).replicas(1).build()) - startListeningToTopic() - topicCreated = true - } - /* kafka message listener stores the messages to consumedMessages. - It is important to clear the list before each test case so that test cases can fetch the message from index '0'. - */ - consumedMessages.clear() - } - - def startListeningToTopic() { - ContainerProperties containerProperties = new ContainerProperties(cpsEventTopic) - containerProperties.setMessageListener([ - onMessage: { - record -> - consumedMessages.add(record.value()) - }] as MessageListener) - - ConcurrentMessageListenerContainer container = - new ConcurrentMessageListenerContainer<>( - consumerFactory, - containerProperties) - - container.start() - ContainerTestUtils.waitForAssignment(container, 1) - } - -} diff --git a/cps-service/src/test/groovy/org/onap/cps/notification/KafkaTestContainerConfig.groovy b/cps-service/src/test/groovy/org/onap/cps/notification/KafkaTestContainerConfig.groovy deleted file mode 100644 index b07b31a35b..0000000000 --- a/cps-service/src/test/groovy/org/onap/cps/notification/KafkaTestContainerConfig.groovy +++ /dev/null @@ -1,49 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2021 Bell Canada. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.notification - -import org.testcontainers.containers.KafkaContainer -import org.testcontainers.utility.DockerImageName - -class KafkaTestContainerConfig { - - private static KafkaContainer kafkaContainer - - static { - getKafkaContainer() - } - - // Not the best performance but it is good enough for test case - private static synchronized KafkaContainer getKafkaContainer() { - if (kafkaContainer == null) { - kafkaContainer = new KafkaContainer(DockerImageName.parse("registry.nordix.org/onaptest/confluentinc/cp-kafka:6.2.1").asCompatibleSubstituteFor("confluentinc/cp-kafka")) - .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false") - kafkaContainer.start() - Runtime.getRuntime().addShutdownHook(new Thread(kafkaContainer::stop)) - } - return kafkaContainer - } - - static String getBootstrapServers() { - getKafkaContainer() - return kafkaContainer.getBootstrapServers() - } - -} diff --git a/cps-service/src/test/groovy/org/onap/cps/notification/NotificationErrorHandlerSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/notification/NotificationErrorHandlerSpec.groovy deleted file mode 100644 index 89e305aedb..0000000000 --- a/cps-service/src/test/groovy/org/onap/cps/notification/NotificationErrorHandlerSpec.groovy +++ /dev/null @@ -1,60 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2022-2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.notification - -import ch.qos.logback.classic.Logger -import ch.qos.logback.classic.spi.ILoggingEvent -import ch.qos.logback.core.read.ListAppender -import org.junit.jupiter.api.AfterEach -import org.junit.jupiter.api.BeforeEach -import org.slf4j.LoggerFactory - -import spock.lang.Specification - -class NotificationErrorHandlerSpec extends Specification{ - - NotificationErrorHandler objectUnderTest = new NotificationErrorHandler() - def logWatcher = Spy(ListAppender<ILoggingEvent>) - - @BeforeEach - void setup() { - ((Logger) LoggerFactory.getLogger(NotificationErrorHandler.class)).addAppender(logWatcher); - logWatcher.start(); - } - - @AfterEach - void teardown() { - ((Logger) LoggerFactory.getLogger(NotificationErrorHandler.class)).detachAndStopAllAppenders(); - } - - def 'Logging exception via notification error handler #scenario'() { - when: 'exception #scenario occurs' - objectUnderTest.onException(exception, 'some context') - then: 'log output results contains the correct error details' - def logMessage = logWatcher.list[0].getFormattedMessage() - assert logMessage.contains('Failed to process') - assert logMessage.contains("Error cause: ${exptectedCauseString}") - assert logMessage.contains("Error context: [some context]") - where: - scenario | exception || exptectedCauseString - 'with cause' | new Exception('message') || 'message' - 'without cause' | new Exception('message', new RuntimeException('cause')) || 'java.lang.RuntimeException: cause' - } -} diff --git a/cps-service/src/test/groovy/org/onap/cps/notification/NotificationPublisherSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/notification/NotificationPublisherSpec.groovy deleted file mode 100644 index 6cd9ae1b20..0000000000 --- a/cps-service/src/test/groovy/org/onap/cps/notification/NotificationPublisherSpec.groovy +++ /dev/null @@ -1,91 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2021 Bell Canada. All rights reserved. - * Modifications Copyright (C) 2021-2022 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.notification - -import org.apache.kafka.clients.producer.ProducerRecord -import org.onap.cps.event.model.Content -import org.onap.cps.event.model.CpsDataUpdatedEvent -import org.spockframework.spring.SpringBean -import org.springframework.kafka.KafkaException -import org.springframework.kafka.core.KafkaTemplate -import spock.util.concurrent.PollingConditions - -class NotificationPublisherSpec extends KafkaPublisherSpecBase { - - @SpringBean - NotificationErrorHandler spyNotificationErrorHandler = Spy(new NotificationErrorHandler()) - - @SpringBean - KafkaProducerListener spyKafkaProducerListener = Spy(new KafkaProducerListener<>(spyNotificationErrorHandler)) - - KafkaTemplate spyKafkaTemplate - NotificationPublisher objectUnderTest - - def myAnchorName = 'my-anchor' - def myDataspaceName = 'my-dataspace' - - def cpsDataUpdatedEvent = new CpsDataUpdatedEvent() - .withContent(new Content() - .withDataspaceName(myDataspaceName) - .withAnchorName(myAnchorName)) - - def setup() { - spyKafkaTemplate = Spy(kafkaTemplate) - objectUnderTest = new NotificationPublisher(spyKafkaTemplate, cpsEventTopic); - } - - def 'Sending event to message bus with correct message Key.'() { - - when: 'event is sent to publisher' - objectUnderTest.sendNotification(cpsDataUpdatedEvent) - kafkaTemplate.flush() - - then: 'event is sent to correct topic with the expected messageKey' - interaction { - def messageKey = myDataspaceName + "," + myAnchorName - 1 * spyKafkaTemplate.send(cpsEventTopic, messageKey, cpsDataUpdatedEvent) - } - and: 'received a successful response' - 1 * spyKafkaProducerListener.onSuccess(_ as ProducerRecord, _) - and: 'kafka consumer returns expected message' - def conditions = new PollingConditions(timeout: 60, initialDelay: 0, factor: 1) - conditions.eventually { - assert cpsDataUpdatedEvent == consumedMessages.get(0) - } - } - - def 'Handling of async errors from message bus.'() { - given: 'topic does not exist' - objectUnderTest.topicName = 'non-existing-topic' - - when: 'message to sent to a non-existing topic' - objectUnderTest.sendNotification(cpsDataUpdatedEvent) - kafkaTemplate.flush() - - then: 'error is thrown' - thrown KafkaException - and: 'error handler is called with exception details' - 1 * spyKafkaProducerListener.onError(_ as ProducerRecord, _, _ as Exception) - 1 * spyNotificationErrorHandler.onException(_ as String, _ as Exception, - _ as ProducerRecord, _) - } - -} diff --git a/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy deleted file mode 100644 index f07f89b391..0000000000 --- a/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy +++ /dev/null @@ -1,158 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (c) 2021-2022 Bell Canada. - * Modifications Copyright (C) 2022-2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.notification - -import org.onap.cps.api.CpsAdminService -import org.onap.cps.config.AsyncConfig -import org.onap.cps.event.model.CpsDataUpdatedEvent -import org.onap.cps.spi.model.Anchor -import org.spockframework.spring.SpringBean -import org.spockframework.spring.SpringSpy -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.boot.context.properties.EnableConfigurationProperties -import org.springframework.boot.test.context.SpringBootTest -import org.springframework.test.context.ContextConfiguration -import spock.lang.Shared -import spock.lang.Specification - -import java.time.OffsetDateTime -import java.util.concurrent.TimeUnit - -@SpringBootTest -@EnableConfigurationProperties -@ContextConfiguration(classes = [NotificationProperties, NotificationService, NotificationErrorHandler, AsyncConfig]) -class NotificationServiceSpec extends Specification { - - @SpringSpy - NotificationProperties spyNotificationProperties - @SpringBean - NotificationPublisher mockNotificationPublisher = Mock() - @SpringBean - CpsDataUpdatedEventFactory mockCpsDataUpdatedEventFactory = Mock() - @SpringSpy - NotificationErrorHandler spyNotificationErrorHandler - @SpringBean - CpsAdminService mockCpsAdminService = Mock() - - @Autowired - NotificationService objectUnderTest - - @Shared - def dataspaceName = 'my-dataspace-published' - @Shared - def anchorName = 'my-anchorname' - @Shared - def anchor = new Anchor('my-anchorname', 'my-dataspace-published', 'my-schemaset-name') - def myObservedTimestamp = OffsetDateTime.now() - - def setup() { - mockCpsAdminService.getAnchor(dataspaceName, anchorName) >> anchor - } - - def 'Skip sending notification when disabled.'() { - given: 'notification is disabled' - spyNotificationProperties.isEnabled() >> false - when: 'dataUpdatedEvent is received' - objectUnderTest.processDataUpdatedEvent(anchor, '/', Operation.CREATE, myObservedTimestamp) - then: 'the notification is not sent' - 0 * mockNotificationPublisher.sendNotification(_) - } - - def 'Send notification when enabled: #scenario.'() { - given: 'notification is enabled' - spyNotificationProperties.isEnabled() >> true - and: 'an anchor is in dataspace where #scenario' - def anchor = new Anchor('my-anchorname', dataspaceName, 'my-schemaset-name') - and: 'event factory can create event successfully' - def cpsDataUpdatedEvent = new CpsDataUpdatedEvent() - mockCpsDataUpdatedEventFactory.createCpsDataUpdatedEvent(anchor, myObservedTimestamp, Operation.CREATE) >> cpsDataUpdatedEvent - when: 'dataUpdatedEvent is received' - def future = objectUnderTest.processDataUpdatedEvent(anchor, '/', Operation.CREATE, myObservedTimestamp) - and: 'wait for async processing to complete' - future.get(10, TimeUnit.SECONDS) - then: 'async process completed successfully' - future.isDone() - and: 'notification is sent' - expectedSendNotificationCount * mockNotificationPublisher.sendNotification(cpsDataUpdatedEvent) - where: - scenario | dataspaceName || expectedSendNotificationCount - 'dataspace name does not match filter' | 'does-not-match-pattern' || 0 - 'dataspace name matches filter' | 'my-dataspace-published' || 1 - } - - def '#scenario are changed with xpath #xpath and operation #operation'() { - given: 'notification is enabled' - spyNotificationProperties.isEnabled() >> true - and: 'event factory creates event if operation is #operation' - def cpsDataUpdatedEvent = new CpsDataUpdatedEvent() - mockCpsDataUpdatedEventFactory.createCpsDataUpdatedEvent(anchor, myObservedTimestamp, expectedOperationInEvent) >> - cpsDataUpdatedEvent - when: 'dataUpdatedEvent is received for #xpath' - def future = objectUnderTest.processDataUpdatedEvent(anchor, xpath, operation, myObservedTimestamp) - and: 'wait for async processing to complete' - future.get(10, TimeUnit.SECONDS) - then: 'async process completed successfully' - future.isDone() - and: 'notification is sent' - 1 * mockNotificationPublisher.sendNotification(cpsDataUpdatedEvent) - where: - scenario | xpath | operation || expectedOperationInEvent - 'Same event is sent when root nodes' | '' | Operation.CREATE || Operation.CREATE - 'Same event is sent when root nodes' | '' | Operation.UPDATE || Operation.UPDATE - 'Same event is sent when root nodes' | '' | Operation.DELETE || Operation.DELETE - 'Same event is sent when root nodes' | '/' | Operation.CREATE || Operation.CREATE - 'Same event is sent when root nodes' | '/' | Operation.UPDATE || Operation.UPDATE - 'Same event is sent when root nodes' | '/' | Operation.DELETE || Operation.DELETE - 'Same event is sent when container nodes' | '/parent' | Operation.CREATE || Operation.CREATE - 'Same event is sent when container nodes' | '/parent' | Operation.UPDATE || Operation.UPDATE - 'Same event is sent when container nodes' | '/parent' | Operation.DELETE || Operation.DELETE - 'UPDATE event is sent when non root nodes' | '/parent/child' | Operation.CREATE || Operation.UPDATE - 'UPDATE event is sent when non root nodes' | '/parent/child' | Operation.UPDATE || Operation.UPDATE - 'UPDATE event is sent when non root nodes' | '/parent/child' | Operation.DELETE || Operation.UPDATE - } - - def 'Error handling in notification service.'() { - given: 'notification is enabled' - spyNotificationProperties.isEnabled() >> true - and: 'event factory can not create event successfully' - mockCpsDataUpdatedEventFactory.createCpsDataUpdatedEvent(anchor, myObservedTimestamp, Operation.CREATE) >> - { throw new Exception("Could not create event") } - when: 'event is sent for processing' - def future = objectUnderTest.processDataUpdatedEvent(anchor, '/', Operation.CREATE, myObservedTimestamp) - and: 'wait for async processing to complete' - future.get(10, TimeUnit.SECONDS) - then: 'async process completed successfully' - future.isDone() - and: 'error is handled and not thrown to caller' - notThrown Exception - 1 * spyNotificationErrorHandler.onException(_, _, _, '/', Operation.CREATE) - } - - def 'Disabled Notification services'() { - given: 'a notification service that is disabled' - spyNotificationProperties.enabled >> false - NotificationService notificationService = new NotificationService(spyNotificationProperties, mockNotificationPublisher, mockCpsDataUpdatedEventFactory, spyNotificationErrorHandler, mockCpsAdminService) - notificationService.init() - expect: 'it will not send notifications' - assert notificationService.shouldSendNotification('') == false - } -} |