summaryrefslogtreecommitdiffstats
path: root/cps-service/src/main
diff options
context:
space:
mode:
authorToine Siebelink <toine.siebelink@est.tech>2023-12-18 15:16:26 +0000
committerGerrit Code Review <gerrit@onap.org>2023-12-18 15:16:26 +0000
commit597d4ef72e5388092cc80ae662af6711f735369f (patch)
tree4ad76262d8ff886c96fe7fe5cbcbfa830f9004d4 /cps-service/src/main
parent66d033e5bb4317d02d343fc3a5f50dde2d4097d6 (diff)
parentdb6b8f845ac72da59c12831ae8c7efa180f9ace2 (diff)
Merge "Remove Notification code for updated events"
Diffstat (limited to 'cps-service/src/main')
-rwxr-xr-xcps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java40
-rw-r--r--cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java108
-rw-r--r--cps-service/src/main/java/org/onap/cps/notification/KafkaProducerListener.java52
-rw-r--r--cps-service/src/main/java/org/onap/cps/notification/NotificationErrorHandler.java40
-rw-r--r--cps-service/src/main/java/org/onap/cps/notification/NotificationProperties.java43
-rw-r--r--cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java66
-rw-r--r--cps-service/src/main/java/org/onap/cps/notification/NotificationService.java124
-rw-r--r--cps-service/src/main/java/org/onap/cps/notification/Operation.java27
8 files changed, 0 insertions, 500 deletions
diff --git a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java
index e74e0ad249..6672d6883f 100755
--- a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java
+++ b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java
@@ -24,10 +24,6 @@
package org.onap.cps.api.impl;
-import static org.onap.cps.notification.Operation.CREATE;
-import static org.onap.cps.notification.Operation.DELETE;
-import static org.onap.cps.notification.Operation.UPDATE;
-
import io.micrometer.core.annotation.Timed;
import java.io.Serializable;
import java.time.OffsetDateTime;
@@ -43,8 +39,6 @@ import org.onap.cps.api.CpsAdminService;
import org.onap.cps.api.CpsDataService;
import org.onap.cps.api.CpsDeltaService;
import org.onap.cps.cpspath.parser.CpsPathUtil;
-import org.onap.cps.notification.NotificationService;
-import org.onap.cps.notification.Operation;
import org.onap.cps.spi.CpsDataPersistenceService;
import org.onap.cps.spi.FetchDescendantsOption;
import org.onap.cps.spi.exceptions.DataValidationException;
@@ -70,7 +64,6 @@ public class CpsDataServiceImpl implements CpsDataService {
private final CpsDataPersistenceService cpsDataPersistenceService;
private final CpsAdminService cpsAdminService;
private final YangTextSchemaSourceSetCache yangTextSchemaSourceSetCache;
- private final NotificationService notificationService;
private final CpsValidator cpsValidator;
private final TimedYangParser timedYangParser;
private final CpsDeltaService cpsDeltaService;
@@ -90,7 +83,6 @@ public class CpsDataServiceImpl implements CpsDataService {
final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
final Collection<DataNode> dataNodes = buildDataNodes(anchor, ROOT_NODE_XPATH, nodeData, contentType);
cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes);
- processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, CREATE, observedTimestamp);
}
@Override
@@ -109,7 +101,6 @@ public class CpsDataServiceImpl implements CpsDataService {
final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, nodeData, contentType);
cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
- processDataUpdatedEventAsync(anchor, parentNodeXpath, CREATE, observedTimestamp);
}
@Override
@@ -127,7 +118,6 @@ public class CpsDataServiceImpl implements CpsDataService {
cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath,
listElementDataNodeCollection);
}
- processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
}
@Override
@@ -141,7 +131,6 @@ public class CpsDataServiceImpl implements CpsDataService {
buildDataNodes(anchor, parentNodeXpath, jsonDataList, ContentType.JSON);
cpsDataPersistenceService.addMultipleLists(dataspaceName, anchorName, parentNodeXpath,
listElementDataNodeCollections);
- processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
}
@Override
@@ -177,7 +166,6 @@ public class CpsDataServiceImpl implements CpsDataService {
final Map<String, Map<String, Serializable>> xpathToUpdatedLeaves = dataNodesInPatch.stream()
.collect(Collectors.toMap(DataNode::getXpath, DataNode::getLeaves));
cpsDataPersistenceService.batchUpdateDataLeaves(dataspaceName, anchorName, xpathToUpdatedLeaves);
- processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
}
@Override
@@ -194,7 +182,6 @@ public class CpsDataServiceImpl implements CpsDataService {
for (final DataNode dataNodeUpdate : dataNodeUpdates) {
processDataNodeUpdate(anchor, dataNodeUpdate);
}
- processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
}
@Override
@@ -244,7 +231,6 @@ public class CpsDataServiceImpl implements CpsDataService {
final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
- processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
}
@Override
@@ -257,8 +243,6 @@ public class CpsDataServiceImpl implements CpsDataService {
final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
final Collection<DataNode> dataNodes = buildDataNodes(anchor, nodesJsonData);
cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
- nodesJsonData.keySet().forEach(nodeXpath ->
- processDataUpdatedEventAsync(anchor, nodeXpath, UPDATE, observedTimestamp));
}
@Override
@@ -279,9 +263,7 @@ public class CpsDataServiceImpl implements CpsDataService {
public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) {
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
- final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
- processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
}
@Override
@@ -290,9 +272,7 @@ public class CpsDataServiceImpl implements CpsDataService {
public void deleteDataNode(final String dataspaceName, final String anchorName, final String dataNodeXpath,
final OffsetDateTime observedTimestamp) {
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
- final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath);
- processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp);
}
@Override
@@ -302,9 +282,6 @@ public class CpsDataServiceImpl implements CpsDataService {
final Collection<String> dataNodeXpaths, final OffsetDateTime observedTimestamp) {
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths);
- final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
- dataNodeXpaths.forEach(dataNodeXpath ->
- processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp));
}
@Override
@@ -313,8 +290,6 @@ public class CpsDataServiceImpl implements CpsDataService {
public void deleteDataNodes(final String dataspaceName, final String anchorName,
final OffsetDateTime observedTimestamp) {
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
- final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
- processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName);
}
@@ -325,9 +300,6 @@ public class CpsDataServiceImpl implements CpsDataService {
final OffsetDateTime observedTimestamp) {
cpsValidator.validateNameCharacters(dataspaceName);
cpsValidator.validateNameCharacters(anchorNames);
- for (final Anchor anchor : cpsAdminService.getAnchors(dataspaceName, anchorNames)) {
- processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
- }
cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorNames);
}
@@ -337,9 +309,7 @@ public class CpsDataServiceImpl implements CpsDataService {
public void deleteListOrListElement(final String dataspaceName, final String anchorName, final String listNodeXpath,
final OffsetDateTime observedTimestamp) {
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
- final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath);
- processDataUpdatedEventAsync(anchor, listNodeXpath, DELETE, observedTimestamp);
}
private Collection<DataNode> buildDataNodes(final Anchor anchor, final Map<String, String> nodesJsonData) {
@@ -385,16 +355,6 @@ public class CpsDataServiceImpl implements CpsDataService {
.collect(Collectors.toList());
}
- private void processDataUpdatedEventAsync(final Anchor anchor, final String xpath,
- final Operation operation, final OffsetDateTime observedTimestamp) {
- try {
- notificationService.processDataUpdatedEvent(anchor, xpath, operation, observedTimestamp);
- } catch (final Exception exception) {
- //If async message can't be queued for notification service, the initial request should not fail.
- log.error("Failed to send message to notification service", exception);
- }
- }
-
private SchemaContext getSchemaContext(final Anchor anchor) {
return yangTextSchemaSourceSetCache
.get(anchor.getDataspaceName(), anchor.getSchemaSetName()).getSchemaContext();
diff --git a/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java b/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java
deleted file mode 100644
index 696fd60f8c..0000000000
--- a/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (c) 2021-2022 Bell Canada.
- * Modifications Copyright (c) 2022-2023 Nordix Foundation
- * Modifications Copyright (C) 2023 TechMahindra Ltd.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.notification;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.time.OffsetDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.UUID;
-import lombok.AllArgsConstructor;
-import lombok.SneakyThrows;
-import org.onap.cps.api.CpsDataService;
-import org.onap.cps.event.model.Content;
-import org.onap.cps.event.model.CpsDataUpdatedEvent;
-import org.onap.cps.event.model.Data;
-import org.onap.cps.spi.FetchDescendantsOption;
-import org.onap.cps.spi.model.Anchor;
-import org.onap.cps.spi.model.DataNode;
-import org.onap.cps.utils.DataMapUtils;
-import org.onap.cps.utils.PrefixResolver;
-import org.springframework.context.annotation.Lazy;
-import org.springframework.stereotype.Component;
-
-@Component
-@AllArgsConstructor(onConstructor = @__(@Lazy))
-public class CpsDataUpdatedEventFactory {
-
- private static final DateTimeFormatter DATE_TIME_FORMATTER =
- DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
-
- @Lazy
- private final CpsDataService cpsDataService;
-
- @Lazy
- private final PrefixResolver prefixResolver;
-
- /**
- * Generates CPS Data Updated event. If observedTimestamp is not provided, then current timestamp is used.
- *
- * @param anchor anchor
- * @param observedTimestamp observedTimestamp
- * @param operation operation
- * @return CpsDataUpdatedEvent
- */
- public CpsDataUpdatedEvent createCpsDataUpdatedEvent(final Anchor anchor,
- final OffsetDateTime observedTimestamp, final Operation operation) {
- final var dataNode = (operation == Operation.DELETE) ? null :
- cpsDataService.getDataNodes(anchor.getDataspaceName(), anchor.getName(),
- "/", FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS).iterator().next();
- return toCpsDataUpdatedEvent(anchor, dataNode, observedTimestamp, operation);
- }
-
- @SneakyThrows(URISyntaxException.class)
- private CpsDataUpdatedEvent toCpsDataUpdatedEvent(final Anchor anchor,
- final DataNode dataNode,
- final OffsetDateTime observedTimestamp,
- final Operation operation) {
- final CpsDataUpdatedEvent cpsDataUpdatedEvent = new CpsDataUpdatedEvent();
- cpsDataUpdatedEvent.withContent(createContent(anchor, dataNode, observedTimestamp, operation));
- cpsDataUpdatedEvent.withId(UUID.randomUUID().toString());
- cpsDataUpdatedEvent.withSchema(new URI("urn:cps:org.onap.cps:data-updated-event-schema:v1"));
- cpsDataUpdatedEvent.withSource(new URI("urn:cps:org.onap.cps"));
- cpsDataUpdatedEvent.withType("org.onap.cps.data-updated-event");
- return cpsDataUpdatedEvent;
- }
-
- private Data createData(final DataNode dataNode, final String prefix) {
- final Data data = new Data();
- DataMapUtils.toDataMapWithIdentifier(dataNode, prefix).forEach(data::setAdditionalProperty);
- return data;
- }
-
- private Content createContent(final Anchor anchor, final DataNode dataNode,
- final OffsetDateTime observedTimestamp, final Operation operation) {
- final var content = new Content();
- content.withAnchorName(anchor.getName());
- content.withDataspaceName(anchor.getDataspaceName());
- content.withSchemaSetName(anchor.getSchemaSetName());
- content.withOperation(Content.Operation.fromValue(operation.name()));
- content.withObservedTimestamp(
- DATE_TIME_FORMATTER.format(observedTimestamp == null ? OffsetDateTime.now() : observedTimestamp));
- if (dataNode != null) {
- final String prefix = prefixResolver.getPrefix(anchor, dataNode.getXpath());
- content.withData(createData(dataNode, prefix));
- }
- return content;
- }
-}
diff --git a/cps-service/src/main/java/org/onap/cps/notification/KafkaProducerListener.java b/cps-service/src/main/java/org/onap/cps/notification/KafkaProducerListener.java
deleted file mode 100644
index f4b68c0699..0000000000
--- a/cps-service/src/main/java/org/onap/cps/notification/KafkaProducerListener.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2021 Bell Canada. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.notification;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.springframework.kafka.support.ProducerListener;
-import org.springframework.stereotype.Component;
-
-@Slf4j
-@Component
-public class KafkaProducerListener<K, V> implements ProducerListener<K, V> {
-
- private NotificationErrorHandler notificationErrorHandler;
-
- public KafkaProducerListener(final NotificationErrorHandler notificationErrorHandler) {
- this.notificationErrorHandler = notificationErrorHandler;
- }
-
- @Override
- public void onSuccess(final ProducerRecord<K, V> producerRecord, final RecordMetadata recordMetadata) {
- log.debug("Message sent to event-bus topic :'{}' with body : {} ", producerRecord.topic(),
- producerRecord.value());
- }
-
- @Override
- public void onError(final ProducerRecord<K, V> producerRecord,
- final RecordMetadata recordMetadata,
- final Exception exception) {
- notificationErrorHandler.onException("Failed to send message to message bus",
- exception, producerRecord, recordMetadata);
- }
-
-}
diff --git a/cps-service/src/main/java/org/onap/cps/notification/NotificationErrorHandler.java b/cps-service/src/main/java/org/onap/cps/notification/NotificationErrorHandler.java
deleted file mode 100644
index eef028d5f3..0000000000
--- a/cps-service/src/main/java/org/onap/cps/notification/NotificationErrorHandler.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2021 Bell Canada. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.notification;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-@Component
-@Slf4j
-public class NotificationErrorHandler {
-
- void onException(final Exception exception, final Object... context) {
- onException("Failed to process", exception, context);
- }
-
- void onException(final String message, final Exception exception, final Object... context) {
- log.error("{} \n Error cause: {} \n Error context: {}",
- message,
- exception.getCause() != null ? exception.getCause().toString() : exception.getMessage(),
- context,
- exception);
- }
-}
diff --git a/cps-service/src/main/java/org/onap/cps/notification/NotificationProperties.java b/cps-service/src/main/java/org/onap/cps/notification/NotificationProperties.java
deleted file mode 100644
index b8a7144b3d..0000000000
--- a/cps-service/src/main/java/org/onap/cps/notification/NotificationProperties.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2021 Bell Canada. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.notification;
-
-import jakarta.validation.constraints.NotNull;
-import java.util.Collections;
-import java.util.Map;
-import lombok.Data;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.stereotype.Component;
-import org.springframework.validation.annotation.Validated;
-
-@ConfigurationProperties(prefix = "notification.data-updated")
-@Component
-@Data
-@Validated
-public class NotificationProperties {
-
- @NotNull
- private String topic;
- private Map<String, String> filters = Collections.emptyMap();
-
- @Value("${notification.enabled:true}")
- private boolean enabled;
-}
diff --git a/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java b/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java
deleted file mode 100644
index 2d87488245..0000000000
--- a/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (c) 2021 Bell Canada.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.notification;
-
-import lombok.extern.slf4j.Slf4j;
-import org.checkerframework.checker.nullness.qual.NonNull;
-import org.onap.cps.event.model.CpsDataUpdatedEvent;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.stereotype.Component;
-
-@Component
-@Slf4j
-public class NotificationPublisher {
-
- private KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate;
- private String topicName;
-
- /**
- * Create an instance of Notification Publisher.
- *
- * @param kafkaTemplate kafkaTemplate is send event using kafka
- * @param topicName topic, to which cpsDataUpdatedEvent is sent, is provided by setting
- * 'notification.data-updated.topic' in the application properties
- */
- @Autowired
- public NotificationPublisher(
- final KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate,
- final @Value("${notification.data-updated.topic}") String topicName) {
- this.kafkaTemplate = kafkaTemplate;
- this.topicName = topicName;
- }
-
- /**
- * Send event to Kafka with correct message key.
- *
- * @param cpsDataUpdatedEvent event to be sent to kafka
- */
- public void sendNotification(@NonNull final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
- final var messageKey = cpsDataUpdatedEvent.getContent().getDataspaceName() + ","
- + cpsDataUpdatedEvent.getContent().getAnchorName();
- log.debug("Data Updated event is being sent with messageKey: '{}' & body : {} ",
- messageKey, cpsDataUpdatedEvent);
- kafkaTemplate.send(topicName, messageKey, cpsDataUpdatedEvent);
- }
-
-}
diff --git a/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java b/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java
deleted file mode 100644
index c29d042293..0000000000
--- a/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (c) 2021-2022 Bell Canada.
- * Modifications Copyright (C) 2022-2023 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.notification;
-
-import jakarta.annotation.PostConstruct;
-import java.time.OffsetDateTime;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.api.CpsAdminService;
-import org.onap.cps.spi.model.Anchor;
-import org.springframework.scheduling.annotation.Async;
-import org.springframework.stereotype.Service;
-
-@Service
-@Slf4j
-@RequiredArgsConstructor
-public class NotificationService {
-
- private final NotificationProperties notificationProperties;
- private final NotificationPublisher notificationPublisher;
- private final CpsDataUpdatedEventFactory cpsDataUpdatedEventFactory;
- private final NotificationErrorHandler notificationErrorHandler;
- private final CpsAdminService cpsAdminService;
- private List<Pattern> dataspacePatterns;
-
- @PostConstruct
- public void init() {
- log.info("Notification Properties {}", notificationProperties);
- this.dataspacePatterns = getDataspaceFilterPatterns(notificationProperties);
- }
-
- private List<Pattern> getDataspaceFilterPatterns(final NotificationProperties notificationProperties) {
- if (notificationProperties.isEnabled()) {
- return Arrays.stream(notificationProperties.getFilters()
- .getOrDefault("enabled-dataspaces", "")
- .split(","))
- .map(filterPattern -> Pattern.compile(filterPattern, Pattern.CASE_INSENSITIVE))
- .collect(Collectors.toList());
- } else {
- return Collections.emptyList();
- }
- }
-
- /**
- * Process Data Updated Event and publishes the notification.
- *
- * @param anchor anchor
- * @param xpath xpath of changed data node
- * @param operation operation
- * @param observedTimestamp observedTimestamp
- * @return future
- */
- @Async("notificationExecutor")
- public Future<Void> processDataUpdatedEvent(final Anchor anchor, final String xpath, final Operation operation,
- final OffsetDateTime observedTimestamp) {
-
- log.debug("process data updated event for anchor '{}'", anchor);
- try {
- if (shouldSendNotification(anchor.getDataspaceName())) {
- final var cpsDataUpdatedEvent =
- cpsDataUpdatedEventFactory.createCpsDataUpdatedEvent(anchor,
- observedTimestamp, getRootNodeOperation(xpath, operation));
- log.debug("data updated event to be published {}", cpsDataUpdatedEvent);
- notificationPublisher.sendNotification(cpsDataUpdatedEvent);
- }
- } catch (final Exception exception) {
- /* All the exceptions are handled to not to propagate it to caller.
- CPS operation should not fail if sending event fails for any reason.
- */
- notificationErrorHandler.onException("Failed to process cps-data-updated-event.",
- exception, anchor, xpath, operation);
- }
- return CompletableFuture.completedFuture(null);
- }
-
- /*
- Add more complex rules based on dataspace and anchor later
- */
- private boolean shouldSendNotification(final String dataspaceName) {
-
- return notificationProperties.isEnabled()
- && dataspacePatterns.stream()
- .anyMatch(pattern -> pattern.matcher(dataspaceName).find());
- }
-
- private Operation getRootNodeOperation(final String xpath, final Operation operation) {
- return isRootXpath(xpath) || isRootContainerNodeXpath(xpath) ? operation : Operation.UPDATE;
- }
-
- private static boolean isRootXpath(final String xpath) {
- return "/".equals(xpath) || "".equals(xpath);
- }
-
- private static boolean isRootContainerNodeXpath(final String xpath) {
- return 0 == xpath.lastIndexOf('/');
- }
-
-}
diff --git a/cps-service/src/main/java/org/onap/cps/notification/Operation.java b/cps-service/src/main/java/org/onap/cps/notification/Operation.java
deleted file mode 100644
index 83e1ccf79f..0000000000
--- a/cps-service/src/main/java/org/onap/cps/notification/Operation.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (c) 2022 Bell Canada.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.notification;
-
-public enum Operation {
- CREATE,
- UPDATE,
- DELETE
-}