From eb3a808eaad99ccccacd1b852034ec3bf02ed061 Mon Sep 17 00:00:00 2001 From: "rajesh.kumar" Date: Fri, 23 Feb 2024 20:29:59 +0530 Subject: Add Notification support in cps core Add notification support using cloud events Issue-ID:CPS-2068 Change-Id: I56c34400dc73c71b936a51260efd240223babacd Signed-off-by: rajesh.kumar --- cps-service/pom.xml | 16 +++ .../org/onap/cps/api/impl/CpsDataServiceImpl.java | 37 ++++++- .../cps/events/CpsDataUpdateEventsService.java | 107 +++++++++++++++++++++ .../main/java/org/onap/cps/events/CpsEvent.java | 66 +++++++++++++ .../cps/api/impl/CpsDataServiceImplSpec.groovy | 38 +++++++- .../onap/cps/api/impl/E2ENetworkSliceSpec.groovy | 6 +- .../events/CpsDataUpdateEventsServiceSpec.groovy | 95 ++++++++++++++++++ 7 files changed, 360 insertions(+), 5 deletions(-) create mode 100644 cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsService.java create mode 100644 cps-service/src/main/java/org/onap/cps/events/CpsEvent.java create mode 100644 cps-service/src/test/groovy/org/onap/cps/events/CpsDataUpdateEventsServiceSpec.groovy (limited to 'cps-service') diff --git a/cps-service/pom.xml b/cps-service/pom.xml index adca617636..8197e9f95c 100644 --- a/cps-service/pom.xml +++ b/cps-service/pom.xml @@ -36,6 +36,10 @@ cps-service + + org.apache.commons + commons-lang3 + org.springframework spring-messaging @@ -74,6 +78,18 @@ io.micrometer micrometer-core + + io.cloudevents + cloudevents-json-jackson + + + io.cloudevents + cloudevents-kafka + + + io.cloudevents + cloudevents-spring + jakarta.validation jakarta.validation-api diff --git a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java index b3f42279df..f556f40647 100644 --- a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java +++ b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java @@ -3,7 +3,7 @@ * Copyright (C) 2021-2024 Nordix Foundation * Modifications Copyright (C) 2020-2022 Bell Canada. * Modifications Copyright (C) 2021 Pantheon.tech - * Modifications Copyright (C) 2022-2023 TechMahindra Ltd. + * Modifications Copyright (C) 2022-2024 TechMahindra Ltd. * Modifications Copyright (C) 2022 Deutsche Telekom AG * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -39,6 +39,8 @@ import org.onap.cps.api.CpsAnchorService; import org.onap.cps.api.CpsDataService; import org.onap.cps.api.CpsDeltaService; import org.onap.cps.cpspath.parser.CpsPathUtil; +import org.onap.cps.events.CpsDataUpdateEventsService; +import org.onap.cps.events.model.Data.Operation; import org.onap.cps.spi.CpsDataPersistenceService; import org.onap.cps.spi.FetchDescendantsOption; import org.onap.cps.spi.exceptions.DataValidationException; @@ -61,7 +63,9 @@ public class CpsDataServiceImpl implements CpsDataService { private static final long DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS = 300L; private final CpsDataPersistenceService cpsDataPersistenceService; + private final CpsDataUpdateEventsService cpsDataUpdateEventsService; private final CpsAnchorService cpsAnchorService; + private final CpsValidator cpsValidator; private final YangParser yangParser; private final CpsDeltaService cpsDeltaService; @@ -81,6 +85,7 @@ public class CpsDataServiceImpl implements CpsDataService { final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName); final Collection dataNodes = buildDataNodes(anchor, ROOT_NODE_XPATH, nodeData, contentType); cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes); + sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, Operation.CREATE, observedTimestamp); } @Override @@ -99,6 +104,7 @@ public class CpsDataServiceImpl implements CpsDataService { final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName); final Collection dataNodes = buildDataNodes(anchor, parentNodeXpath, nodeData, contentType); cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes); + sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.CREATE, observedTimestamp); } @Override @@ -116,6 +122,7 @@ public class CpsDataServiceImpl implements CpsDataService { cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath, listElementDataNodeCollection); } + sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp); } @Override @@ -151,6 +158,7 @@ public class CpsDataServiceImpl implements CpsDataService { final Map> xpathToUpdatedLeaves = dataNodesInPatch.stream() .collect(Collectors.toMap(DataNode::getXpath, DataNode::getLeaves)); cpsDataPersistenceService.batchUpdateDataLeaves(dataspaceName, anchorName, xpathToUpdatedLeaves); + sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp); } @Override @@ -167,6 +175,7 @@ public class CpsDataServiceImpl implements CpsDataService { for (final DataNode dataNodeUpdate : dataNodeUpdates) { processDataNodeUpdate(anchor, dataNodeUpdate); } + sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp); } @Override @@ -216,6 +225,7 @@ public class CpsDataServiceImpl implements CpsDataService { final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName); final Collection dataNodes = buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON); cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes); + sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp); } @Override @@ -228,6 +238,8 @@ public class CpsDataServiceImpl implements CpsDataService { final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName); final Collection dataNodes = buildDataNodes(anchor, nodesJsonData); cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes); + nodesJsonData.keySet().forEach(nodeXpath -> + sendDataUpdatedEvent(anchor, nodeXpath, Operation.UPDATE, observedTimestamp)); } @Override @@ -248,7 +260,9 @@ public class CpsDataServiceImpl implements CpsDataService { public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath, final Collection dataNodes, final OffsetDateTime observedTimestamp) { cpsValidator.validateNameCharacters(dataspaceName, anchorName); + final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName); cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes); + sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp); } @Override @@ -258,6 +272,8 @@ public class CpsDataServiceImpl implements CpsDataService { final OffsetDateTime observedTimestamp) { cpsValidator.validateNameCharacters(dataspaceName, anchorName); cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath); + final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName); + sendDataUpdatedEvent(anchor, dataNodeXpath, Operation.DELETE, observedTimestamp); } @Override @@ -267,8 +283,12 @@ public class CpsDataServiceImpl implements CpsDataService { final Collection dataNodeXpaths, final OffsetDateTime observedTimestamp) { cpsValidator.validateNameCharacters(dataspaceName, anchorName); cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths); + final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName); + dataNodeXpaths.forEach(dataNodeXpath -> + sendDataUpdatedEvent(anchor, dataNodeXpath, Operation.DELETE, observedTimestamp)); } + @Override @Timed(value = "cps.data.service.datanode.delete.anchor", description = "Time taken to delete all datanodes for an anchor") @@ -276,6 +296,8 @@ public class CpsDataServiceImpl implements CpsDataService { final OffsetDateTime observedTimestamp) { cpsValidator.validateNameCharacters(dataspaceName, anchorName); cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName); + final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName); + sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, Operation.DELETE, observedTimestamp); } @Override @@ -286,6 +308,9 @@ public class CpsDataServiceImpl implements CpsDataService { cpsValidator.validateNameCharacters(dataspaceName); cpsValidator.validateNameCharacters(anchorNames); cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorNames); + for (final Anchor anchor : cpsAnchorService.getAnchors(dataspaceName, anchorNames)) { + sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, Operation.DELETE, observedTimestamp); + } } @Override @@ -295,6 +320,8 @@ public class CpsDataServiceImpl implements CpsDataService { final OffsetDateTime observedTimestamp) { cpsValidator.validateNameCharacters(dataspaceName, anchorName); cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath); + final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName); + sendDataUpdatedEvent(anchor, listNodeXpath, Operation.DELETE, observedTimestamp); } private Collection buildDataNodes(final Anchor anchor, final Map nodesJsonData) { @@ -345,4 +372,12 @@ public class CpsDataServiceImpl implements CpsDataService { } } + private void sendDataUpdatedEvent(final Anchor anchor, final String xpath, + final Operation operation, final OffsetDateTime observedTimestamp) { + try { + cpsDataUpdateEventsService.publishCpsDataUpdateEvent(anchor, xpath, operation, observedTimestamp); + } catch (final Exception exception) { + log.error("Failed to send message to notification service", exception); + } + } } diff --git a/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsService.java b/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsService.java new file mode 100644 index 0000000000..e3315c9aba --- /dev/null +++ b/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsService.java @@ -0,0 +1,107 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 TechMahindra Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.events; + +import io.cloudevents.CloudEvent; +import io.micrometer.core.annotation.Timed; +import java.time.OffsetDateTime; +import java.util.HashMap; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.events.model.CpsDataUpdatedEvent; +import org.onap.cps.events.model.Data; +import org.onap.cps.events.model.Data.Operation; +import org.onap.cps.spi.model.Anchor; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +@RequiredArgsConstructor +public class CpsDataUpdateEventsService { + + private final EventsPublisher eventsPublisher; + + @Value("${app.cps.data-updated.topic:cps-data-updated-events}") + private String topicName; + + @Value("${notification.enabled:false}") + private boolean notificationsEnabled; + + /** + * Publish the cps data update event with header to the public topic. + * + * @param anchor Anchor of the updated data + * @param xpath xpath of the updated data + * @param operation operation performed on the data + * @param observedTimestamp timestamp when data was updated. + */ + @Timed(value = "cps.dataupdate.events.publish", description = "Time taken to publish Data Update event") + public void publishCpsDataUpdateEvent(final Anchor anchor, final String xpath, + final Operation operation, final OffsetDateTime observedTimestamp) { + if (notificationsEnabled) { + final CpsDataUpdatedEvent cpsDataUpdatedEvent = createCpsDataUpdatedEvent(anchor, + observedTimestamp, xpath, operation); + final String updateEventId = anchor.getDataspaceName() + ":" + anchor.getName(); + final Map extensions = createUpdateEventExtensions(updateEventId); + final CloudEvent cpsDataUpdatedEventAsCloudEvent = + CpsEvent.builder().type(CpsDataUpdatedEvent.class.getTypeName()).data(cpsDataUpdatedEvent) + .extensions(extensions).build().asCloudEvent(); + eventsPublisher.publishCloudEvent(topicName, updateEventId, cpsDataUpdatedEventAsCloudEvent); + } else { + log.debug("Notifications disabled."); + } + } + + private CpsDataUpdatedEvent createCpsDataUpdatedEvent(final Anchor anchor, final OffsetDateTime observedTimestamp, + final String xpath, + final Operation rootNodeOperation) { + final CpsDataUpdatedEvent cpsDataUpdatedEvent = new CpsDataUpdatedEvent(); + final Data updateEventData = new Data(); + updateEventData.setObservedTimestamp(observedTimestamp.toString()); + updateEventData.setDataspaceName(anchor.getDataspaceName()); + updateEventData.setAnchorName(anchor.getName()); + updateEventData.setSchemaSetName(anchor.getSchemaSetName()); + updateEventData.setOperation(getRootNodeOperation(xpath, rootNodeOperation)); + updateEventData.setXpath(xpath); + cpsDataUpdatedEvent.setData(updateEventData); + return cpsDataUpdatedEvent; + } + + private Map createUpdateEventExtensions(final String eventKey) { + final Map extensions = new HashMap<>(); + extensions.put("correlationid", eventKey); + return extensions; + } + + private Operation getRootNodeOperation(final String xpath, final Operation operation) { + return isRootXpath(xpath) || isRootContainerNodeXpath(xpath) ? operation : Operation.UPDATE; + } + + private static boolean isRootXpath(final String xpath) { + return "/".equals(xpath) || "".equals(xpath); + } + + private static boolean isRootContainerNodeXpath(final String xpath) { + return 0 == xpath.lastIndexOf('/'); + } +} diff --git a/cps-service/src/main/java/org/onap/cps/events/CpsEvent.java b/cps-service/src/main/java/org/onap/cps/events/CpsEvent.java new file mode 100644 index 0000000000..c19abc1764 --- /dev/null +++ b/cps-service/src/main/java/org/onap/cps/events/CpsEvent.java @@ -0,0 +1,66 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 TechMahindra Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.events; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import java.net.URI; +import java.util.Map; +import java.util.UUID; +import lombok.Builder; +import org.apache.commons.lang3.StringUtils; +import org.onap.cps.utils.JsonObjectMapper; + +@Builder +public class CpsEvent { + + private Object data; + private Map extensions; + private String type; + @Builder.Default + private static final String CLOUD_EVENT_SPEC_VERSION_V1 = "1.0.0"; + @Builder.Default + private static final String CLOUD_EVENT_SOURCE = "CPS"; + + private final JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper()); + + /** + * Creates ncmp cloud event with provided attributes. + * + * @return Cloud Event + */ + + public CloudEvent asCloudEvent() { + final CloudEventBuilder cloudEventBuilder = io.cloudevents.core.builder + .CloudEventBuilder.v1() + .withId(UUID.randomUUID().toString()) + .withSource(URI.create(CLOUD_EVENT_SOURCE)) + .withType(type) + .withDataSchema(URI.create("urn:cps:" + type + ":" + CLOUD_EVENT_SPEC_VERSION_V1)) + .withData(jsonObjectMapper.asJsonBytes(data)); + extensions.entrySet().stream() + .filter(extensionEntry -> StringUtils.isNotBlank(extensionEntry.getValue())) + .forEach(extensionEntry -> + cloudEventBuilder.withExtension(extensionEntry.getKey(), extensionEntry.getValue())); + return cloudEventBuilder.build(); + } +} diff --git a/cps-service/src/test/groovy/org/onap/cps/api/impl/CpsDataServiceImplSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/api/impl/CpsDataServiceImplSpec.groovy index b2b2d7d44c..fcbfd0561a 100644 --- a/cps-service/src/test/groovy/org/onap/cps/api/impl/CpsDataServiceImplSpec.groovy +++ b/cps-service/src/test/groovy/org/onap/cps/api/impl/CpsDataServiceImplSpec.groovy @@ -3,7 +3,7 @@ * Copyright (C) 2021-2024 Nordix Foundation * Modifications Copyright (C) 2021 Pantheon.tech * Modifications Copyright (C) 2021-2022 Bell Canada. - * Modifications Copyright (C) 2022-2023 TechMahindra Ltd. + * Modifications Copyright (C) 2022-2024 TechMahindra Ltd. * Modifications Copyright (C) 2022 Deutsche Telekom AG * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,9 +23,13 @@ package org.onap.cps.api.impl +import ch.qos.logback.classic.Level +import ch.qos.logback.classic.Logger +import ch.qos.logback.core.read.ListAppender import org.onap.cps.TestUtils import org.onap.cps.api.CpsAnchorService import org.onap.cps.api.CpsDeltaService +import org.onap.cps.events.CpsDataUpdateEventsService import org.onap.cps.spi.CpsDataPersistenceService import org.onap.cps.spi.FetchDescendantsOption import org.onap.cps.spi.exceptions.ConcurrencyException @@ -41,6 +45,8 @@ import org.onap.cps.utils.YangParser import org.onap.cps.utils.YangParserHelper import org.onap.cps.yang.YangTextSchemaSourceSet import org.onap.cps.yang.YangTextSchemaSourceSetBuilder +import org.slf4j.LoggerFactory +import org.springframework.context.annotation.AnnotationConfigApplicationContext import spock.lang.Shared import spock.lang.Specification import java.time.OffsetDateTime @@ -52,13 +58,28 @@ class CpsDataServiceImplSpec extends Specification { def mockCpsValidator = Mock(CpsValidator) def yangParser = new YangParser(new YangParserHelper(), mockYangTextSchemaSourceSetCache) def mockCpsDeltaService = Mock(CpsDeltaService); + def mockDataUpdateEventsService = Mock(CpsDataUpdateEventsService) - def objectUnderTest = new CpsDataServiceImpl(mockCpsDataPersistenceService, mockCpsAnchorService, mockCpsValidator, yangParser, mockCpsDeltaService) + def objectUnderTest = new CpsDataServiceImpl(mockCpsDataPersistenceService, mockDataUpdateEventsService, mockCpsAnchorService, mockCpsValidator, yangParser, mockCpsDeltaService) + + def logger = (Logger) LoggerFactory.getLogger(objectUnderTest.class) + def loggingListAppender + def applicationContext = new AnnotationConfigApplicationContext() def setup() { mockCpsAnchorService.getAnchor(dataspaceName, anchorName) >> anchor mockCpsAnchorService.getAnchor(dataspaceName, ANCHOR_NAME_1) >> anchor1 mockCpsAnchorService.getAnchor(dataspaceName, ANCHOR_NAME_2) >> anchor2 + logger.setLevel(Level.DEBUG) + loggingListAppender = new ListAppender() + logger.addAppender(loggingListAppender) + loggingListAppender.start() + applicationContext.refresh() + } + + void cleanup() { + ((Logger) LoggerFactory.getLogger(CpsDataServiceImpl.class)).detachAndStopAllAppenders() + applicationContext.close() } @Shared @@ -459,6 +480,19 @@ class CpsDataServiceImplSpec extends Specification { 1 * mockCpsDataPersistenceService.lockAnchor('some-sessionId', 'some-dataspaceName', 'some-anchorName', 250L) } + def 'Exception is thrown while publishing the notification.'(){ + given: 'schema set for given anchor and dataspace references test-tree model' + setupSchemaSetMocks('test-tree.yang') + when: 'publisher set to throw an exception' + mockDataUpdateEventsService.publishCpsDataUpdateEvent(_, _, _, _) >> { throw new Exception("publishing failed")} + and: 'an update event is performed' + objectUnderTest.updateNodeLeaves(dataspaceName, anchorName, '/', '{"test-tree": {"branch": []}}', observedTimestamp) + then: 'the exception is not bubbled up' + noExceptionThrown() + and: "the exception message is logged" + def logs = loggingListAppender.list.toString() + assert logs.contains('Failed to send message to notification service') + } def setupSchemaSetMocks(String... yangResources) { def mockYangTextSchemaSourceSet = Mock(YangTextSchemaSourceSet) mockYangTextSchemaSourceSetCache.get(dataspaceName, schemaSetName) >> mockYangTextSchemaSourceSet diff --git a/cps-service/src/test/groovy/org/onap/cps/api/impl/E2ENetworkSliceSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/api/impl/E2ENetworkSliceSpec.groovy index 140dfaac96..57f2f8ea7c 100755 --- a/cps-service/src/test/groovy/org/onap/cps/api/impl/E2ENetworkSliceSpec.groovy +++ b/cps-service/src/test/groovy/org/onap/cps/api/impl/E2ENetworkSliceSpec.groovy @@ -3,7 +3,7 @@ * Copyright (C) 2021-2024 Nordix Foundation. * Modifications Copyright (C) 2021-2022 Bell Canada. * Modifications Copyright (C) 2021 Pantheon.tech - * Modifications Copyright (C) 2022-2023 TechMahindra Ltd. + * Modifications Copyright (C) 2022-2024 TechMahindra Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ package org.onap.cps.api.impl import org.onap.cps.TestUtils import org.onap.cps.api.CpsAnchorService import org.onap.cps.api.CpsDeltaService +import org.onap.cps.events.CpsDataUpdateEventsService import org.onap.cps.spi.CpsDataPersistenceService import org.onap.cps.spi.CpsModulePersistenceService import org.onap.cps.spi.model.Anchor @@ -50,7 +51,8 @@ class E2ENetworkSliceSpec extends Specification { def cpsModuleServiceImpl = new CpsModuleServiceImpl(mockModuleStoreService, mockYangTextSchemaSourceSetCache, mockCpsAnchorService, mockCpsValidator,timedYangTextSchemaSourceSetBuilder) - def cpsDataServiceImpl = new CpsDataServiceImpl(mockDataStoreService, mockCpsAnchorService, mockCpsValidator, yangParser, mockCpsDeltaService) + def mockDataUpdateEventsService = Mock(CpsDataUpdateEventsService) + def cpsDataServiceImpl = new CpsDataServiceImpl(mockDataStoreService, mockDataUpdateEventsService, mockCpsAnchorService, mockCpsValidator, yangParser, mockCpsDeltaService) def dataspaceName = 'someDataspace' def anchorName = 'someAnchor' diff --git a/cps-service/src/test/groovy/org/onap/cps/events/CpsDataUpdateEventsServiceSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/events/CpsDataUpdateEventsServiceSpec.groovy new file mode 100644 index 0000000000..81b2bf2c95 --- /dev/null +++ b/cps-service/src/test/groovy/org/onap/cps/events/CpsDataUpdateEventsServiceSpec.groovy @@ -0,0 +1,95 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 TechMahindra Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.events + +import static org.onap.cps.events.model.Data.Operation.CREATE +import static org.onap.cps.events.model.Data.Operation.DELETE +import static org.onap.cps.events.model.Data.Operation.UPDATE + +import com.fasterxml.jackson.databind.ObjectMapper +import io.cloudevents.CloudEvent +import io.cloudevents.core.CloudEventUtils +import io.cloudevents.jackson.PojoCloudEventDataMapper +import org.onap.cps.events.model.CpsDataUpdatedEvent +import org.onap.cps.events.model.Data +import org.onap.cps.spi.model.Anchor +import org.onap.cps.utils.JsonObjectMapper +import org.springframework.test.context.ContextConfiguration +import spock.lang.Specification + +import java.time.OffsetDateTime + +@ContextConfiguration(classes = [ObjectMapper, JsonObjectMapper]) +class CpsDataUpdateEventsServiceSpec extends Specification { + def mockEventsPublisher = Mock(EventsPublisher) + def notificationsEnabled = true + def objectMapper = new ObjectMapper(); + + def objectUnderTest = new CpsDataUpdateEventsService(mockEventsPublisher) + + def 'Create and Publish cps update event where events are #scenario'() { + given: 'an anchor, operation and observed timestamp' + def anchor = new Anchor('anchor01', 'dataspace01', 'schema01'); + def operation = operationInRequest + def observedTimestamp = OffsetDateTime.now() + and: 'notificationsEnabled is #notificationsEnabled and it will be true as default' + objectUnderTest.notificationsEnabled = true + when: 'service is called to publish data update event' + objectUnderTest.topicName = "cps-core-event" + objectUnderTest.publishCpsDataUpdateEvent(anchor, xpath, operation, observedTimestamp) + then: 'the event contains the required attributes' + 1 * mockEventsPublisher.publishCloudEvent('cps-core-event', 'dataspace01:anchor01', _) >> { + args -> + { + def cpsDataUpdatedEvent = (args[2] as CloudEvent) + assert cpsDataUpdatedEvent.getExtension('correlationid') == 'dataspace01:anchor01' + assert cpsDataUpdatedEvent.type == 'org.onap.cps.events.model.CpsDataUpdatedEvent' + assert cpsDataUpdatedEvent.source.toString() == 'CPS' + def actualEventOperation = CloudEventUtils.mapData(cpsDataUpdatedEvent, PojoCloudEventDataMapper.from(objectMapper, CpsDataUpdatedEvent.class)).getValue().data.operation + assert actualEventOperation == expectedOperation + } + } + where: 'the following values are used' + scenario | xpath | operationInRequest || expectedOperation + 'empty xpath' | '' | CREATE || CREATE + 'root xpath and create operation' | '/' | CREATE || CREATE + 'root xpath and update operation' | '/' | UPDATE || UPDATE + 'root xpath and delete operation' | '/' | DELETE || DELETE + 'not root xpath and update operation' | 'test' | UPDATE || UPDATE + 'root node xpath and create operation' | '/test' | CREATE || CREATE + 'non root node xpath and update operation' | '/test/path' | CREATE || UPDATE + 'non root node xpath and delete operation' | '/test/path' | DELETE || UPDATE + } + + def 'publish cps update event when notification service is disabled'() { + given: 'an anchor, operation and observed timestamp' + def anchor = new Anchor('anchor01', 'dataspace01', 'schema01'); + def operation = CREATE + def observedTimestamp = OffsetDateTime.now() + and: 'notificationsEnabled is flase' + objectUnderTest.notificationsEnabled = false + when: 'service is called to publish data update event' + objectUnderTest.topicName = "cps-core-event" + objectUnderTest.publishCpsDataUpdateEvent(anchor, '/', operation, observedTimestamp) + then: 'the event contains the required attributes' + 0 * mockEventsPublisher.publishCloudEvent('cps-core-event', 'dataspace01:anchor01', _) + } +} -- cgit 1.2.3-korg