summaryrefslogtreecommitdiffstats
path: root/cps-service/src/main/java
diff options
context:
space:
mode:
authorrajesh.kumar <rk00747546@techmahindra.com>2024-02-23 20:29:59 +0530
committerRajesh kumar <rk00747546@techmahindra.com>2024-04-30 09:20:38 +0000
commiteb3a808eaad99ccccacd1b852034ec3bf02ed061 (patch)
treedaca3b8ad8314ab8b9192470dcf4b0ecc2ec4d3d /cps-service/src/main/java
parentab9ebe9ebef202c2fc3f1b758dc73c02b17e1b06 (diff)
Add Notification support in cps core
Add notification support using cloud events Issue-ID:CPS-2068 Change-Id: I56c34400dc73c71b936a51260efd240223babacd Signed-off-by: rajesh.kumar <rk00747546@techmahindra.com>
Diffstat (limited to 'cps-service/src/main/java')
-rw-r--r--cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java37
-rw-r--r--cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsService.java107
-rw-r--r--cps-service/src/main/java/org/onap/cps/events/CpsEvent.java66
3 files changed, 209 insertions, 1 deletions
diff --git a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java
index b3f42279df..f556f40647 100644
--- a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java
+++ b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java
@@ -3,7 +3,7 @@
* Copyright (C) 2021-2024 Nordix Foundation
* Modifications Copyright (C) 2020-2022 Bell Canada.
* Modifications Copyright (C) 2021 Pantheon.tech
- * Modifications Copyright (C) 2022-2023 TechMahindra Ltd.
+ * Modifications Copyright (C) 2022-2024 TechMahindra Ltd.
* Modifications Copyright (C) 2022 Deutsche Telekom AG
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -39,6 +39,8 @@ import org.onap.cps.api.CpsAnchorService;
import org.onap.cps.api.CpsDataService;
import org.onap.cps.api.CpsDeltaService;
import org.onap.cps.cpspath.parser.CpsPathUtil;
+import org.onap.cps.events.CpsDataUpdateEventsService;
+import org.onap.cps.events.model.Data.Operation;
import org.onap.cps.spi.CpsDataPersistenceService;
import org.onap.cps.spi.FetchDescendantsOption;
import org.onap.cps.spi.exceptions.DataValidationException;
@@ -61,7 +63,9 @@ public class CpsDataServiceImpl implements CpsDataService {
private static final long DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS = 300L;
private final CpsDataPersistenceService cpsDataPersistenceService;
+ private final CpsDataUpdateEventsService cpsDataUpdateEventsService;
private final CpsAnchorService cpsAnchorService;
+
private final CpsValidator cpsValidator;
private final YangParser yangParser;
private final CpsDeltaService cpsDeltaService;
@@ -81,6 +85,7 @@ public class CpsDataServiceImpl implements CpsDataService {
final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
final Collection<DataNode> dataNodes = buildDataNodes(anchor, ROOT_NODE_XPATH, nodeData, contentType);
cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes);
+ sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, Operation.CREATE, observedTimestamp);
}
@Override
@@ -99,6 +104,7 @@ public class CpsDataServiceImpl implements CpsDataService {
final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, nodeData, contentType);
cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
+ sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.CREATE, observedTimestamp);
}
@Override
@@ -116,6 +122,7 @@ public class CpsDataServiceImpl implements CpsDataService {
cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath,
listElementDataNodeCollection);
}
+ sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp);
}
@Override
@@ -151,6 +158,7 @@ public class CpsDataServiceImpl implements CpsDataService {
final Map<String, Map<String, Serializable>> xpathToUpdatedLeaves = dataNodesInPatch.stream()
.collect(Collectors.toMap(DataNode::getXpath, DataNode::getLeaves));
cpsDataPersistenceService.batchUpdateDataLeaves(dataspaceName, anchorName, xpathToUpdatedLeaves);
+ sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp);
}
@Override
@@ -167,6 +175,7 @@ public class CpsDataServiceImpl implements CpsDataService {
for (final DataNode dataNodeUpdate : dataNodeUpdates) {
processDataNodeUpdate(anchor, dataNodeUpdate);
}
+ sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp);
}
@Override
@@ -216,6 +225,7 @@ public class CpsDataServiceImpl implements CpsDataService {
final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
+ sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp);
}
@Override
@@ -228,6 +238,8 @@ public class CpsDataServiceImpl implements CpsDataService {
final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
final Collection<DataNode> dataNodes = buildDataNodes(anchor, nodesJsonData);
cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
+ nodesJsonData.keySet().forEach(nodeXpath ->
+ sendDataUpdatedEvent(anchor, nodeXpath, Operation.UPDATE, observedTimestamp));
}
@Override
@@ -248,7 +260,9 @@ public class CpsDataServiceImpl implements CpsDataService {
public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) {
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
+ final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
+ sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp);
}
@Override
@@ -258,6 +272,8 @@ public class CpsDataServiceImpl implements CpsDataService {
final OffsetDateTime observedTimestamp) {
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath);
+ final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
+ sendDataUpdatedEvent(anchor, dataNodeXpath, Operation.DELETE, observedTimestamp);
}
@Override
@@ -267,8 +283,12 @@ public class CpsDataServiceImpl implements CpsDataService {
final Collection<String> dataNodeXpaths, final OffsetDateTime observedTimestamp) {
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths);
+ final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
+ dataNodeXpaths.forEach(dataNodeXpath ->
+ sendDataUpdatedEvent(anchor, dataNodeXpath, Operation.DELETE, observedTimestamp));
}
+
@Override
@Timed(value = "cps.data.service.datanode.delete.anchor",
description = "Time taken to delete all datanodes for an anchor")
@@ -276,6 +296,8 @@ public class CpsDataServiceImpl implements CpsDataService {
final OffsetDateTime observedTimestamp) {
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName);
+ final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
+ sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, Operation.DELETE, observedTimestamp);
}
@Override
@@ -286,6 +308,9 @@ public class CpsDataServiceImpl implements CpsDataService {
cpsValidator.validateNameCharacters(dataspaceName);
cpsValidator.validateNameCharacters(anchorNames);
cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorNames);
+ for (final Anchor anchor : cpsAnchorService.getAnchors(dataspaceName, anchorNames)) {
+ sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, Operation.DELETE, observedTimestamp);
+ }
}
@Override
@@ -295,6 +320,8 @@ public class CpsDataServiceImpl implements CpsDataService {
final OffsetDateTime observedTimestamp) {
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath);
+ final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
+ sendDataUpdatedEvent(anchor, listNodeXpath, Operation.DELETE, observedTimestamp);
}
private Collection<DataNode> buildDataNodes(final Anchor anchor, final Map<String, String> nodesJsonData) {
@@ -345,4 +372,12 @@ public class CpsDataServiceImpl implements CpsDataService {
}
}
+ private void sendDataUpdatedEvent(final Anchor anchor, final String xpath,
+ final Operation operation, final OffsetDateTime observedTimestamp) {
+ try {
+ cpsDataUpdateEventsService.publishCpsDataUpdateEvent(anchor, xpath, operation, observedTimestamp);
+ } catch (final Exception exception) {
+ log.error("Failed to send message to notification service", exception);
+ }
+ }
}
diff --git a/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsService.java b/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsService.java
new file mode 100644
index 0000000000..e3315c9aba
--- /dev/null
+++ b/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsService.java
@@ -0,0 +1,107 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 TechMahindra Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.events;
+
+import io.cloudevents.CloudEvent;
+import io.micrometer.core.annotation.Timed;
+import java.time.OffsetDateTime;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.events.model.CpsDataUpdatedEvent;
+import org.onap.cps.events.model.Data;
+import org.onap.cps.events.model.Data.Operation;
+import org.onap.cps.spi.model.Anchor;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class CpsDataUpdateEventsService {
+
+ private final EventsPublisher<CpsDataUpdatedEvent> eventsPublisher;
+
+ @Value("${app.cps.data-updated.topic:cps-data-updated-events}")
+ private String topicName;
+
+ @Value("${notification.enabled:false}")
+ private boolean notificationsEnabled;
+
+ /**
+ * Publish the cps data update event with header to the public topic.
+ *
+ * @param anchor Anchor of the updated data
+ * @param xpath xpath of the updated data
+ * @param operation operation performed on the data
+ * @param observedTimestamp timestamp when data was updated.
+ */
+ @Timed(value = "cps.dataupdate.events.publish", description = "Time taken to publish Data Update event")
+ public void publishCpsDataUpdateEvent(final Anchor anchor, final String xpath,
+ final Operation operation, final OffsetDateTime observedTimestamp) {
+ if (notificationsEnabled) {
+ final CpsDataUpdatedEvent cpsDataUpdatedEvent = createCpsDataUpdatedEvent(anchor,
+ observedTimestamp, xpath, operation);
+ final String updateEventId = anchor.getDataspaceName() + ":" + anchor.getName();
+ final Map<String, String> extensions = createUpdateEventExtensions(updateEventId);
+ final CloudEvent cpsDataUpdatedEventAsCloudEvent =
+ CpsEvent.builder().type(CpsDataUpdatedEvent.class.getTypeName()).data(cpsDataUpdatedEvent)
+ .extensions(extensions).build().asCloudEvent();
+ eventsPublisher.publishCloudEvent(topicName, updateEventId, cpsDataUpdatedEventAsCloudEvent);
+ } else {
+ log.debug("Notifications disabled.");
+ }
+ }
+
+ private CpsDataUpdatedEvent createCpsDataUpdatedEvent(final Anchor anchor, final OffsetDateTime observedTimestamp,
+ final String xpath,
+ final Operation rootNodeOperation) {
+ final CpsDataUpdatedEvent cpsDataUpdatedEvent = new CpsDataUpdatedEvent();
+ final Data updateEventData = new Data();
+ updateEventData.setObservedTimestamp(observedTimestamp.toString());
+ updateEventData.setDataspaceName(anchor.getDataspaceName());
+ updateEventData.setAnchorName(anchor.getName());
+ updateEventData.setSchemaSetName(anchor.getSchemaSetName());
+ updateEventData.setOperation(getRootNodeOperation(xpath, rootNodeOperation));
+ updateEventData.setXpath(xpath);
+ cpsDataUpdatedEvent.setData(updateEventData);
+ return cpsDataUpdatedEvent;
+ }
+
+ private Map<String, String> createUpdateEventExtensions(final String eventKey) {
+ final Map<String, String> extensions = new HashMap<>();
+ extensions.put("correlationid", eventKey);
+ return extensions;
+ }
+
+ private Operation getRootNodeOperation(final String xpath, final Operation operation) {
+ return isRootXpath(xpath) || isRootContainerNodeXpath(xpath) ? operation : Operation.UPDATE;
+ }
+
+ private static boolean isRootXpath(final String xpath) {
+ return "/".equals(xpath) || "".equals(xpath);
+ }
+
+ private static boolean isRootContainerNodeXpath(final String xpath) {
+ return 0 == xpath.lastIndexOf('/');
+ }
+}
diff --git a/cps-service/src/main/java/org/onap/cps/events/CpsEvent.java b/cps-service/src/main/java/org/onap/cps/events/CpsEvent.java
new file mode 100644
index 0000000000..c19abc1764
--- /dev/null
+++ b/cps-service/src/main/java/org/onap/cps/events/CpsEvent.java
@@ -0,0 +1,66 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 TechMahindra Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.events;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import java.net.URI;
+import java.util.Map;
+import java.util.UUID;
+import lombok.Builder;
+import org.apache.commons.lang3.StringUtils;
+import org.onap.cps.utils.JsonObjectMapper;
+
+@Builder
+public class CpsEvent {
+
+ private Object data;
+ private Map<String, String> extensions;
+ private String type;
+ @Builder.Default
+ private static final String CLOUD_EVENT_SPEC_VERSION_V1 = "1.0.0";
+ @Builder.Default
+ private static final String CLOUD_EVENT_SOURCE = "CPS";
+
+ private final JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper());
+
+ /**
+ * Creates ncmp cloud event with provided attributes.
+ *
+ * @return Cloud Event
+ */
+
+ public CloudEvent asCloudEvent() {
+ final CloudEventBuilder cloudEventBuilder = io.cloudevents.core.builder
+ .CloudEventBuilder.v1()
+ .withId(UUID.randomUUID().toString())
+ .withSource(URI.create(CLOUD_EVENT_SOURCE))
+ .withType(type)
+ .withDataSchema(URI.create("urn:cps:" + type + ":" + CLOUD_EVENT_SPEC_VERSION_V1))
+ .withData(jsonObjectMapper.asJsonBytes(data));
+ extensions.entrySet().stream()
+ .filter(extensionEntry -> StringUtils.isNotBlank(extensionEntry.getValue()))
+ .forEach(extensionEntry ->
+ cloudEventBuilder.withExtension(extensionEntry.getKey(), extensionEntry.getValue()));
+ return cloudEventBuilder.build();
+ }
+}