diff options
Diffstat (limited to 'cps-service/src/main/java')
4 files changed, 83 insertions, 58 deletions
diff --git a/cps-service/src/main/java/org/onap/cps/api/CpsDataService.java b/cps-service/src/main/java/org/onap/cps/api/CpsDataService.java index 2583e9905b..31a7517340 100644 --- a/cps-service/src/main/java/org/onap/cps/api/CpsDataService.java +++ b/cps-service/src/main/java/org/onap/cps/api/CpsDataService.java @@ -2,6 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2020 Nordix Foundation * Modifications Copyright (C) 2021 Pantheon.tech + * Modifications Copyright (C) 2021 Bell Canada * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +22,7 @@ package org.onap.cps.api; +import java.time.OffsetDateTime; import org.checkerframework.checker.nullness.qual.NonNull; import org.onap.cps.spi.FetchDescendantsOption; import org.onap.cps.spi.model.DataNode; @@ -36,8 +38,10 @@ public interface CpsDataService { * @param dataspaceName dataspace name * @param anchorName anchor name * @param jsonData json data + * @param observedTimestamp observedTimestamp */ - void saveData(@NonNull String dataspaceName, @NonNull String anchorName, @NonNull String jsonData); + void saveData(@NonNull String dataspaceName, @NonNull String anchorName, @NonNull String jsonData, + OffsetDateTime observedTimestamp); /** * Persists child data fragment under existing data node for the given anchor and dataspace. @@ -46,21 +50,23 @@ public interface CpsDataService { * @param anchorName anchor name * @param parentNodeXpath parent node xpath * @param jsonData json data + * @param observedTimestamp observedTimestamp */ void saveData(@NonNull String dataspaceName, @NonNull String anchorName, @NonNull String parentNodeXpath, - @NonNull String jsonData); + @NonNull String jsonData, OffsetDateTime observedTimestamp); /** - * Persists child data fragment representing list-node (with one or more elements) under existing data node - * for the given anchor and dataspace. + * Persists child data fragment representing list-node (with one or more elements) under existing data node for the + * given anchor and dataspace. * * @param dataspaceName dataspace name * @param anchorName anchor name * @param parentNodeXpath parent node xpath * @param jsonData json data representing list element + * @param observedTimestamp observedTimestamp */ void saveListNodeData(@NonNull String dataspaceName, @NonNull String anchorName, @NonNull String parentNodeXpath, - @NonNull String jsonData); + @NonNull String jsonData, OffsetDateTime observedTimestamp); /** * Retrieves datanode by XPath for given dataspace and anchor. @@ -82,9 +88,10 @@ public interface CpsDataService { * @param anchorName anchor name * @param parentNodeXpath xpath to parent node * @param jsonData json data + * @param observedTimestamp observedTimestamp */ void updateNodeLeaves(@NonNull String dataspaceName, @NonNull String anchorName, @NonNull String parentNodeXpath, - @NonNull String jsonData); + @NonNull String jsonData, OffsetDateTime observedTimestamp); /** * Replaces existing data node content including descendants. @@ -93,42 +100,47 @@ public interface CpsDataService { * @param anchorName anchor name * @param parentNodeXpath xpath to parent node * @param jsonData json data + * @param observedTimestamp observedTimestamp */ void replaceNodeTree(@NonNull String dataspaceName, @NonNull String anchorName, @NonNull String parentNodeXpath, - @NonNull String jsonData); + @NonNull String jsonData, OffsetDateTime observedTimestamp); /** - * Replaces (if exists) child data fragment representing list-node (with one or more elements) - * under existing data node for the given anchor and dataspace. + * Replaces (if exists) child data fragment representing list-node (with one or more elements) under existing data + * node for the given anchor and dataspace. * - * @param dataspaceName dataspace name - * @param anchorName anchor name - * @param parentNodeXpath parent node xpath - * @param jsonData json data representing list element + * @param dataspaceName dataspace name + * @param anchorName anchor name + * @param parentNodeXpath parent node xpath + * @param jsonData json data representing list element + * @param observedTimestamp observedTimestamp */ void replaceListNodeData(@NonNull String dataspaceName, @NonNull String anchorName, @NonNull String parentNodeXpath, - @NonNull String jsonData); + @NonNull String jsonData, OffsetDateTime observedTimestamp); /** - * Deletes (if exists) child data fragment representing list-node (with one or more elements) - * under existing data node for the given anchor and dataspace. + * Deletes (if exists) child data fragment representing list-node (with one or more elements) under existing data + * node for the given anchor and dataspace. * - * @param dataspaceName dataspace name - * @param anchorName anchor name - * @param listNodeXpath list node xpath + * @param dataspaceName dataspace name + * @param anchorName anchor name + * @param listNodeXpath list node xpath + * @param observedTimestamp observedTimestamp */ - void deleteListNodeData(@NonNull String dataspaceName, @NonNull String anchorName, @NonNull String listNodeXpath); + void deleteListNodeData(@NonNull String dataspaceName, @NonNull String anchorName, @NonNull String listNodeXpath, + OffsetDateTime observedTimestamp); /** - * Updates leaves of DataNode for given dataspace and anchor using xpath, - * along with the leaves of each Child Data Node which already exists. - * This method will throw an exception if data node update or any descendant update does not exist. + * Updates leaves of DataNode for given dataspace and anchor using xpath, along with the leaves of each Child Data + * Node which already exists. This method will throw an exception if data node update or any descendant update does + * not exist. * - * @param dataspaceName dataspace name - * @param anchorName anchor name - * @param parentNodeXpath xpath + * @param dataspaceName dataspace name + * @param anchorName anchor name + * @param parentNodeXpath xpath * @param dataNodeUpdatesAsJson json data representing data node updates + * @param observedTimestamp observedTimestamp */ void updateNodeLeavesAndExistingDescendantLeaves(String dataspaceName, String anchorName, String parentNodeXpath, - String dataNodeUpdatesAsJson); + String dataNodeUpdatesAsJson, OffsetDateTime observedTimestamp); } diff --git a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java index 8989dc80ef..7b3567ed3c 100755 --- a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java +++ b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java @@ -22,6 +22,7 @@ package org.onap.cps.api.impl; +import java.time.OffsetDateTime; import java.util.Collection; import lombok.extern.slf4j.Slf4j; import org.onap.cps.api.CpsAdminService; @@ -61,27 +62,28 @@ public class CpsDataServiceImpl implements CpsDataService { private NotificationService notificationService; @Override - public void saveData(final String dataspaceName, final String anchorName, final String jsonData) { + public void saveData(final String dataspaceName, final String anchorName, final String jsonData, + final OffsetDateTime observedTimestamp) { final var dataNode = buildDataNodeFromJson(dataspaceName, anchorName, ROOT_NODE_XPATH, jsonData); cpsDataPersistenceService.storeDataNode(dataspaceName, anchorName, dataNode); - processDataUpdatedEventAsync(dataspaceName, anchorName); + processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp); } @Override public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath, - final String jsonData) { + final String jsonData, final OffsetDateTime observedTimestamp) { final var dataNode = buildDataNodeFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData); cpsDataPersistenceService.addChildDataNode(dataspaceName, anchorName, parentNodeXpath, dataNode); - processDataUpdatedEventAsync(dataspaceName, anchorName); + processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp); } @Override public void saveListNodeData(final String dataspaceName, final String anchorName, - final String parentNodeXpath, final String jsonData) { + final String parentNodeXpath, final String jsonData, final OffsetDateTime observedTimestamp) { final Collection<DataNode> dataNodesCollection = buildDataNodeCollectionFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData); cpsDataPersistenceService.addListDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodesCollection); - processDataUpdatedEventAsync(dataspaceName, anchorName); + processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp); } @Override @@ -92,46 +94,48 @@ public class CpsDataServiceImpl implements CpsDataService { @Override public void updateNodeLeaves(final String dataspaceName, final String anchorName, final String parentNodeXpath, - final String jsonData) { + final String jsonData, final OffsetDateTime observedTimestamp) { final var dataNode = buildDataNodeFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData); cpsDataPersistenceService .updateDataLeaves(dataspaceName, anchorName, dataNode.getXpath(), dataNode.getLeaves()); - processDataUpdatedEventAsync(dataspaceName, anchorName); + processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp); } @Override public void updateNodeLeavesAndExistingDescendantLeaves(final String dataspaceName, final String anchorName, - final String parentNodeXpath, - final String dataNodeUpdatesAsJson) { + final String parentNodeXpath, + final String dataNodeUpdatesAsJson, + final OffsetDateTime observedTimestamp) { final Collection<DataNode> dataNodeUpdates = buildDataNodeCollectionFromJson(dataspaceName, anchorName, parentNodeXpath, dataNodeUpdatesAsJson); for (final DataNode dataNodeUpdate : dataNodeUpdates) { processDataNodeUpdate(dataspaceName, anchorName, dataNodeUpdate); } - notificationService.processDataUpdatedEvent(dataspaceName, anchorName); + processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp); } @Override public void replaceNodeTree(final String dataspaceName, final String anchorName, final String parentNodeXpath, - final String jsonData) { + final String jsonData, final OffsetDateTime observedTimestamp) { final var dataNode = buildDataNodeFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData); cpsDataPersistenceService.replaceDataNodeTree(dataspaceName, anchorName, dataNode); - processDataUpdatedEventAsync(dataspaceName, anchorName); + processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp); } @Override public void replaceListNodeData(final String dataspaceName, final String anchorName, final String parentNodeXpath, - final String jsonData) { + final String jsonData, final OffsetDateTime observedTimestamp) { final Collection<DataNode> dataNodes = buildDataNodeCollectionFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData); cpsDataPersistenceService.replaceListDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes); - processDataUpdatedEventAsync(dataspaceName, anchorName); + processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp); } @Override - public void deleteListNodeData(final String dataspaceName, final String anchorName, final String listNodeXpath) { + public void deleteListNodeData(final String dataspaceName, final String anchorName, final String listNodeXpath, + final OffsetDateTime observedTimestamp) { cpsDataPersistenceService.deleteListDataNodes(dataspaceName, anchorName, listNodeXpath); - processDataUpdatedEventAsync(dataspaceName, anchorName); + processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp); } @@ -171,9 +175,10 @@ public class CpsDataServiceImpl implements CpsDataService { } - private void processDataUpdatedEventAsync(final String dataspaceName, final String anchorName) { + private void processDataUpdatedEventAsync(final String dataspaceName, final String anchorName, + final OffsetDateTime observedTimestamp) { try { - notificationService.processDataUpdatedEvent(dataspaceName, anchorName); + notificationService.processDataUpdatedEvent(dataspaceName, anchorName, observedTimestamp); } catch (final Exception exception) { log.error("Failed to send message to notification service", exception); } diff --git a/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java b/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java index e0c8fe7055..85e5abab0d 100644 --- a/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java +++ b/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java @@ -42,7 +42,7 @@ public class CpsDataUpdatedEventFactory { private static final URI EVENT_SCHEMA; private static final URI EVENT_SOURCE; private static final String EVENT_TYPE = "org.onap.cps.data-updated-event"; - private static final DateTimeFormatter dateTimeFormatter = + private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); static { @@ -64,22 +64,25 @@ public class CpsDataUpdatedEventFactory { } /** - * Generates CPS Data Updated event. + * Generates CPS Data Updated event. If observedTimestamp is not provided, then current timestamp is used. * - * @param dataspaceName dataspaceName - * @param anchorName anchorName + * @param dataspaceName dataspaceName + * @param anchorName anchorName + * @param observedTimestamp observedTimestamp * @return CpsDataUpdatedEvent */ - public CpsDataUpdatedEvent createCpsDataUpdatedEvent(final String dataspaceName, final String anchorName) { + public CpsDataUpdatedEvent createCpsDataUpdatedEvent(final String dataspaceName, final String anchorName, + final OffsetDateTime observedTimestamp) { final var dataNode = cpsDataService .getDataNode(dataspaceName, anchorName, "/", FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS); final var anchor = cpsAdminService.getAnchor(dataspaceName, anchorName); - return toCpsDataUpdatedEvent(anchor, dataNode); + return toCpsDataUpdatedEvent(anchor, dataNode, observedTimestamp); } - private CpsDataUpdatedEvent toCpsDataUpdatedEvent(final Anchor anchor, final DataNode dataNode) { + private CpsDataUpdatedEvent toCpsDataUpdatedEvent(final Anchor anchor, final DataNode dataNode, + final OffsetDateTime observedTimestamp) { final var cpsDataUpdatedEvent = new CpsDataUpdatedEvent(); - cpsDataUpdatedEvent.withContent(createContent(anchor, dataNode)); + cpsDataUpdatedEvent.withContent(createContent(anchor, dataNode, observedTimestamp)); cpsDataUpdatedEvent.withId(UUID.randomUUID().toString()); cpsDataUpdatedEvent.withSchema(EVENT_SCHEMA); cpsDataUpdatedEvent.withSource(EVENT_SOURCE); @@ -93,13 +96,15 @@ public class CpsDataUpdatedEventFactory { return data; } - private Content createContent(final Anchor anchor, final DataNode dataNode) { + private Content createContent(final Anchor anchor, final DataNode dataNode, + final OffsetDateTime observedTimestamp) { final var content = new Content(); content.withAnchorName(anchor.getName()); content.withDataspaceName(anchor.getDataspaceName()); content.withSchemaSetName(anchor.getSchemaSetName()); content.withData(createData(dataNode)); - content.withObservedTimestamp(dateTimeFormatter.format(OffsetDateTime.now())); + content.withObservedTimestamp( + DATE_TIME_FORMATTER.format(observedTimestamp == null ? OffsetDateTime.now() : observedTimestamp)); return content; } } diff --git a/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java b/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java index 4745739a4f..029efbe795 100644 --- a/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java +++ b/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java @@ -20,6 +20,7 @@ package org.onap.cps.notification; +import java.time.OffsetDateTime; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -79,15 +80,17 @@ public class NotificationService { * * @param dataspaceName dataspace name * @param anchorName anchor name + * @param observedTimestamp observedTimestamp * @return future */ @Async("notificationExecutor") - public Future<Void> processDataUpdatedEvent(final String dataspaceName, final String anchorName) { + public Future<Void> processDataUpdatedEvent(final String dataspaceName, final String anchorName, + final OffsetDateTime observedTimestamp) { log.debug("process data updated event for dataspace '{}' & anchor '{}'", dataspaceName, anchorName); try { if (shouldSendNotification(dataspaceName)) { final var cpsDataUpdatedEvent = - cpsDataUpdatedEventFactory.createCpsDataUpdatedEvent(dataspaceName, anchorName); + cpsDataUpdatedEventFactory.createCpsDataUpdatedEvent(dataspaceName, anchorName, observedTimestamp); log.debug("data updated event to be published {}", cpsDataUpdatedEvent); notificationPublisher.sendNotification(cpsDataUpdatedEvent); } |