diff options
author | rajesh.kumar <rk00747546@techmahindra.com> | 2024-02-23 20:29:59 +0530 |
---|---|---|
committer | Rajesh kumar <rk00747546@techmahindra.com> | 2024-04-30 09:20:38 +0000 |
commit | eb3a808eaad99ccccacd1b852034ec3bf02ed061 (patch) | |
tree | daca3b8ad8314ab8b9192470dcf4b0ecc2ec4d3d /cps-service/src/main/java/org | |
parent | ab9ebe9ebef202c2fc3f1b758dc73c02b17e1b06 (diff) |
Add Notification support in cps core
Add notification support using cloud events
Issue-ID:CPS-2068
Change-Id: I56c34400dc73c71b936a51260efd240223babacd
Signed-off-by: rajesh.kumar <rk00747546@techmahindra.com>
Diffstat (limited to 'cps-service/src/main/java/org')
3 files changed, 209 insertions, 1 deletions
diff --git a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java index b3f42279df..f556f40647 100644 --- a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java +++ b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java @@ -3,7 +3,7 @@ * Copyright (C) 2021-2024 Nordix Foundation * Modifications Copyright (C) 2020-2022 Bell Canada. * Modifications Copyright (C) 2021 Pantheon.tech - * Modifications Copyright (C) 2022-2023 TechMahindra Ltd. + * Modifications Copyright (C) 2022-2024 TechMahindra Ltd. * Modifications Copyright (C) 2022 Deutsche Telekom AG * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -39,6 +39,8 @@ import org.onap.cps.api.CpsAnchorService; import org.onap.cps.api.CpsDataService; import org.onap.cps.api.CpsDeltaService; import org.onap.cps.cpspath.parser.CpsPathUtil; +import org.onap.cps.events.CpsDataUpdateEventsService; +import org.onap.cps.events.model.Data.Operation; import org.onap.cps.spi.CpsDataPersistenceService; import org.onap.cps.spi.FetchDescendantsOption; import org.onap.cps.spi.exceptions.DataValidationException; @@ -61,7 +63,9 @@ public class CpsDataServiceImpl implements CpsDataService { private static final long DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS = 300L; private final CpsDataPersistenceService cpsDataPersistenceService; + private final CpsDataUpdateEventsService cpsDataUpdateEventsService; private final CpsAnchorService cpsAnchorService; + private final CpsValidator cpsValidator; private final YangParser yangParser; private final CpsDeltaService cpsDeltaService; @@ -81,6 +85,7 @@ public class CpsDataServiceImpl implements CpsDataService { final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName); final Collection<DataNode> dataNodes = buildDataNodes(anchor, ROOT_NODE_XPATH, nodeData, contentType); cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes); + sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, Operation.CREATE, observedTimestamp); } @Override @@ -99,6 +104,7 @@ public class CpsDataServiceImpl implements CpsDataService { final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName); final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, nodeData, contentType); cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes); + sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.CREATE, observedTimestamp); } @Override @@ -116,6 +122,7 @@ public class CpsDataServiceImpl implements CpsDataService { cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath, listElementDataNodeCollection); } + sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp); } @Override @@ -151,6 +158,7 @@ public class CpsDataServiceImpl implements CpsDataService { final Map<String, Map<String, Serializable>> xpathToUpdatedLeaves = dataNodesInPatch.stream() .collect(Collectors.toMap(DataNode::getXpath, DataNode::getLeaves)); cpsDataPersistenceService.batchUpdateDataLeaves(dataspaceName, anchorName, xpathToUpdatedLeaves); + sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp); } @Override @@ -167,6 +175,7 @@ public class CpsDataServiceImpl implements CpsDataService { for (final DataNode dataNodeUpdate : dataNodeUpdates) { processDataNodeUpdate(anchor, dataNodeUpdate); } + sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp); } @Override @@ -216,6 +225,7 @@ public class CpsDataServiceImpl implements CpsDataService { final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName); final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON); cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes); + sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp); } @Override @@ -228,6 +238,8 @@ public class CpsDataServiceImpl implements CpsDataService { final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName); final Collection<DataNode> dataNodes = buildDataNodes(anchor, nodesJsonData); cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes); + nodesJsonData.keySet().forEach(nodeXpath -> + sendDataUpdatedEvent(anchor, nodeXpath, Operation.UPDATE, observedTimestamp)); } @Override @@ -248,7 +260,9 @@ public class CpsDataServiceImpl implements CpsDataService { public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath, final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) { cpsValidator.validateNameCharacters(dataspaceName, anchorName); + final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName); cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes); + sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp); } @Override @@ -258,6 +272,8 @@ public class CpsDataServiceImpl implements CpsDataService { final OffsetDateTime observedTimestamp) { cpsValidator.validateNameCharacters(dataspaceName, anchorName); cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath); + final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName); + sendDataUpdatedEvent(anchor, dataNodeXpath, Operation.DELETE, observedTimestamp); } @Override @@ -267,8 +283,12 @@ public class CpsDataServiceImpl implements CpsDataService { final Collection<String> dataNodeXpaths, final OffsetDateTime observedTimestamp) { cpsValidator.validateNameCharacters(dataspaceName, anchorName); cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths); + final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName); + dataNodeXpaths.forEach(dataNodeXpath -> + sendDataUpdatedEvent(anchor, dataNodeXpath, Operation.DELETE, observedTimestamp)); } + @Override @Timed(value = "cps.data.service.datanode.delete.anchor", description = "Time taken to delete all datanodes for an anchor") @@ -276,6 +296,8 @@ public class CpsDataServiceImpl implements CpsDataService { final OffsetDateTime observedTimestamp) { cpsValidator.validateNameCharacters(dataspaceName, anchorName); cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName); + final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName); + sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, Operation.DELETE, observedTimestamp); } @Override @@ -286,6 +308,9 @@ public class CpsDataServiceImpl implements CpsDataService { cpsValidator.validateNameCharacters(dataspaceName); cpsValidator.validateNameCharacters(anchorNames); cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorNames); + for (final Anchor anchor : cpsAnchorService.getAnchors(dataspaceName, anchorNames)) { + sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, Operation.DELETE, observedTimestamp); + } } @Override @@ -295,6 +320,8 @@ public class CpsDataServiceImpl implements CpsDataService { final OffsetDateTime observedTimestamp) { cpsValidator.validateNameCharacters(dataspaceName, anchorName); cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath); + final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName); + sendDataUpdatedEvent(anchor, listNodeXpath, Operation.DELETE, observedTimestamp); } private Collection<DataNode> buildDataNodes(final Anchor anchor, final Map<String, String> nodesJsonData) { @@ -345,4 +372,12 @@ public class CpsDataServiceImpl implements CpsDataService { } } + private void sendDataUpdatedEvent(final Anchor anchor, final String xpath, + final Operation operation, final OffsetDateTime observedTimestamp) { + try { + cpsDataUpdateEventsService.publishCpsDataUpdateEvent(anchor, xpath, operation, observedTimestamp); + } catch (final Exception exception) { + log.error("Failed to send message to notification service", exception); + } + } } diff --git a/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsService.java b/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsService.java new file mode 100644 index 0000000000..e3315c9aba --- /dev/null +++ b/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsService.java @@ -0,0 +1,107 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 TechMahindra Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.events; + +import io.cloudevents.CloudEvent; +import io.micrometer.core.annotation.Timed; +import java.time.OffsetDateTime; +import java.util.HashMap; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.events.model.CpsDataUpdatedEvent; +import org.onap.cps.events.model.Data; +import org.onap.cps.events.model.Data.Operation; +import org.onap.cps.spi.model.Anchor; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +@RequiredArgsConstructor +public class CpsDataUpdateEventsService { + + private final EventsPublisher<CpsDataUpdatedEvent> eventsPublisher; + + @Value("${app.cps.data-updated.topic:cps-data-updated-events}") + private String topicName; + + @Value("${notification.enabled:false}") + private boolean notificationsEnabled; + + /** + * Publish the cps data update event with header to the public topic. + * + * @param anchor Anchor of the updated data + * @param xpath xpath of the updated data + * @param operation operation performed on the data + * @param observedTimestamp timestamp when data was updated. + */ + @Timed(value = "cps.dataupdate.events.publish", description = "Time taken to publish Data Update event") + public void publishCpsDataUpdateEvent(final Anchor anchor, final String xpath, + final Operation operation, final OffsetDateTime observedTimestamp) { + if (notificationsEnabled) { + final CpsDataUpdatedEvent cpsDataUpdatedEvent = createCpsDataUpdatedEvent(anchor, + observedTimestamp, xpath, operation); + final String updateEventId = anchor.getDataspaceName() + ":" + anchor.getName(); + final Map<String, String> extensions = createUpdateEventExtensions(updateEventId); + final CloudEvent cpsDataUpdatedEventAsCloudEvent = + CpsEvent.builder().type(CpsDataUpdatedEvent.class.getTypeName()).data(cpsDataUpdatedEvent) + .extensions(extensions).build().asCloudEvent(); + eventsPublisher.publishCloudEvent(topicName, updateEventId, cpsDataUpdatedEventAsCloudEvent); + } else { + log.debug("Notifications disabled."); + } + } + + private CpsDataUpdatedEvent createCpsDataUpdatedEvent(final Anchor anchor, final OffsetDateTime observedTimestamp, + final String xpath, + final Operation rootNodeOperation) { + final CpsDataUpdatedEvent cpsDataUpdatedEvent = new CpsDataUpdatedEvent(); + final Data updateEventData = new Data(); + updateEventData.setObservedTimestamp(observedTimestamp.toString()); + updateEventData.setDataspaceName(anchor.getDataspaceName()); + updateEventData.setAnchorName(anchor.getName()); + updateEventData.setSchemaSetName(anchor.getSchemaSetName()); + updateEventData.setOperation(getRootNodeOperation(xpath, rootNodeOperation)); + updateEventData.setXpath(xpath); + cpsDataUpdatedEvent.setData(updateEventData); + return cpsDataUpdatedEvent; + } + + private Map<String, String> createUpdateEventExtensions(final String eventKey) { + final Map<String, String> extensions = new HashMap<>(); + extensions.put("correlationid", eventKey); + return extensions; + } + + private Operation getRootNodeOperation(final String xpath, final Operation operation) { + return isRootXpath(xpath) || isRootContainerNodeXpath(xpath) ? operation : Operation.UPDATE; + } + + private static boolean isRootXpath(final String xpath) { + return "/".equals(xpath) || "".equals(xpath); + } + + private static boolean isRootContainerNodeXpath(final String xpath) { + return 0 == xpath.lastIndexOf('/'); + } +} diff --git a/cps-service/src/main/java/org/onap/cps/events/CpsEvent.java b/cps-service/src/main/java/org/onap/cps/events/CpsEvent.java new file mode 100644 index 0000000000..c19abc1764 --- /dev/null +++ b/cps-service/src/main/java/org/onap/cps/events/CpsEvent.java @@ -0,0 +1,66 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 TechMahindra Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.events; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import java.net.URI; +import java.util.Map; +import java.util.UUID; +import lombok.Builder; +import org.apache.commons.lang3.StringUtils; +import org.onap.cps.utils.JsonObjectMapper; + +@Builder +public class CpsEvent { + + private Object data; + private Map<String, String> extensions; + private String type; + @Builder.Default + private static final String CLOUD_EVENT_SPEC_VERSION_V1 = "1.0.0"; + @Builder.Default + private static final String CLOUD_EVENT_SOURCE = "CPS"; + + private final JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper()); + + /** + * Creates ncmp cloud event with provided attributes. + * + * @return Cloud Event + */ + + public CloudEvent asCloudEvent() { + final CloudEventBuilder cloudEventBuilder = io.cloudevents.core.builder + .CloudEventBuilder.v1() + .withId(UUID.randomUUID().toString()) + .withSource(URI.create(CLOUD_EVENT_SOURCE)) + .withType(type) + .withDataSchema(URI.create("urn:cps:" + type + ":" + CLOUD_EVENT_SPEC_VERSION_V1)) + .withData(jsonObjectMapper.asJsonBytes(data)); + extensions.entrySet().stream() + .filter(extensionEntry -> StringUtils.isNotBlank(extensionEntry.getValue())) + .forEach(extensionEntry -> + cloudEventBuilder.withExtension(extensionEntry.getKey(), extensionEntry.getValue())); + return cloudEventBuilder.build(); + } +} |