diff options
4 files changed, 168 insertions, 7 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java index 38f3db98de..6b02adb654 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java @@ -57,4 +57,15 @@ public interface CmNotificationSubscriptionPersistenceService { */ Collection<String> getOngoingCmNotificationSubscriptionIds(final DatastoreType datastoreType, final String cmHandleId, final String xpath); + + /** + * Add or update cm notification subscription. + * + * @param datastoreType valid datastore type + * @param cmHandle cmhandle id + * @param xpath valid xpath + * @param newSubscriptionId subscription Id to be added + */ + void addOrUpdateCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandle, + final String xpath, final String newSubscriptionId); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImpl.java index 6e4997a4dd..5eca5e8c57 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImpl.java @@ -20,15 +20,26 @@ package org.onap.cps.ncmp.api.impl.events.cmsubscription.service; +import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RUNNING; +import static org.onap.cps.spi.FetchDescendantsOption.OMIT_DESCENDANTS; + +import java.io.Serializable; +import java.time.OffsetDateTime; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.onap.cps.api.CpsDataService; import org.onap.cps.api.CpsQueryService; +import org.onap.cps.cpspath.parser.CpsPathUtil; import org.onap.cps.ncmp.api.impl.operations.DatastoreType; -import org.onap.cps.spi.FetchDescendantsOption; import org.onap.cps.spi.model.DataNode; +import org.onap.cps.utils.ContentType; +import org.onap.cps.utils.JsonObjectMapper; import org.springframework.stereotype.Service; @Slf4j @@ -37,14 +48,16 @@ import org.springframework.stereotype.Service; public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotificationSubscriptionPersistenceService { private static final String SUBSCRIPTION_ANCHOR_NAME = "cm-data-subscriptions"; - private static final String IS_ONGOING_CM_SUBSCRIPTION_CPS_PATH_QUERY = """ + private static final String CM_SUBSCRIPTION_CPS_PATH_QUERY = """ /datastores/datastore[@name='%s']/cm-handles/cm-handle[@id='%s']/filters/filter[@xpath='%s'] """.trim(); private static final String SUBSCRIPTION_IDS_CPS_PATH_QUERY = """ //filter/subscriptionIds[text()='%s'] """.trim(); + private final JsonObjectMapper jsonObjectMapper; private final CpsQueryService cpsQueryService; + private final CpsDataService cpsDataService; @Override public boolean isOngoingCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId, @@ -56,7 +69,7 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif public boolean isUniqueSubscriptionId(final String subscriptionId) { return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, SUBSCRIPTION_IDS_CPS_PATH_QUERY.formatted(subscriptionId), - FetchDescendantsOption.OMIT_DESCENDANTS).isEmpty(); + OMIT_DESCENDANTS).isEmpty(); } @Override @@ -64,17 +77,65 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif final String cmHandleId, final String xpath) { final String isOngoingCmSubscriptionCpsPathQuery = - IS_ONGOING_CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted(datastoreType.getDatastoreName(), cmHandleId, + CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted(datastoreType.getDatastoreName(), cmHandleId, escapeQuotesByDoublingThem(xpath)); final Collection<DataNode> existingNodes = cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME, - isOngoingCmSubscriptionCpsPathQuery, FetchDescendantsOption.OMIT_DESCENDANTS); + isOngoingCmSubscriptionCpsPathQuery, OMIT_DESCENDANTS); if (existingNodes.isEmpty()) { return Collections.emptyList(); } return (List<String>) existingNodes.iterator().next().getLeaves().get("subscriptionIds"); } + @Override + public void addOrUpdateCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId, + final String xpath, final String newSubscriptionId) { + if (isOngoingCmNotificationSubscription(datastoreType, cmHandleId, xpath)) { + final DataNode existingFilterNode = + cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, + CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted(datastoreType.getDatastoreName(), cmHandleId, + escapeQuotesByDoublingThem(xpath)), + OMIT_DESCENDANTS).iterator().next(); + final Collection<String> existingSubscriptionIds = getOngoingCmNotificationSubscriptionIds(datastoreType, + cmHandleId, xpath); + if (!existingSubscriptionIds.contains(newSubscriptionId)) { + updateListOfSubscribers(existingSubscriptionIds, newSubscriptionId, existingFilterNode); + } + } else { + addNewSubscriptionViaDatastore(datastoreType, cmHandleId, xpath, newSubscriptionId); + } + } + + private void addNewSubscriptionViaDatastore(final DatastoreType datastoreType, final String cmHandleId, + final String xpath, final String newSubscriptionId) { + final String parentXpathFormat = "/datastores/datastore[@name='%s']/cm-handles"; + String parentXpath = ""; + if (datastoreType == PASSTHROUGH_RUNNING) { + parentXpath = parentXpathFormat.formatted("ncmp-datastore:passthrough-running"); + } else { + parentXpath = parentXpathFormat.formatted("ncmp-datastore:passthrough-operational"); + } + + final String updatedJson = String.format("{\"cm-handle\":[{\"id\":\"%s\",\"filters\":{\"filter\":" + + "[{\"xpath\":\"%s\",\"subscriptionIds\":[\"%s\"]}]}}]}", cmHandleId, xpath, newSubscriptionId); + cpsDataService.saveData(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, parentXpath, updatedJson, + OffsetDateTime.now(), ContentType.JSON); + } + + private void updateListOfSubscribers(final Collection<String> existingSubscriptionIds, + final String newSubscriptionId, final DataNode existingFilterNode) { + final String parentXpath = CpsPathUtil.getNormalizedParentXpath(existingFilterNode.getXpath()); + final List<String> updatedSubscribers = new ArrayList<>(existingSubscriptionIds); + updatedSubscribers.add(newSubscriptionId); + final Map<String, Serializable> updatedLeaves = new HashMap<>(); + updatedLeaves.put("xpath", existingFilterNode.getLeaves().get("xpath")); + updatedLeaves.put("subscriptionIds", (Serializable) updatedSubscribers); + final String updatedJson = "{\"filter\":[" + jsonObjectMapper.asJsonString(updatedLeaves) + "]}"; + cpsDataService.updateNodeLeaves(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, parentXpath, updatedJson, + OffsetDateTime.now()); + } + private static String escapeQuotesByDoublingThem(final String inputXpath) { return inputXpath.replace("'", "''"); } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImplSpec.groovy index eb0e1100ed..19ebc3d711 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImplSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImplSpec.groovy @@ -20,18 +20,22 @@ package org.onap.cps.ncmp.api.impl.events.cmsubscription.service - +import org.onap.cps.api.CpsDataService import org.onap.cps.api.CpsQueryService import org.onap.cps.ncmp.api.impl.operations.DatastoreType import org.onap.cps.spi.FetchDescendantsOption import org.onap.cps.spi.model.DataNode +import org.onap.cps.utils.JsonObjectMapper +import com.fasterxml.jackson.databind.ObjectMapper import spock.lang.Specification class CmNotificationSubscriptionPersistenceServiceImplSpec extends Specification { + def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper()) def mockCpsQueryService = Mock(CpsQueryService) + def mockCpsDataService = Mock(CpsDataService) - def objectUnderTest = new CmNotificationSubscriptionPersistenceServiceImpl(mockCpsQueryService) + def objectUnderTest = new CmNotificationSubscriptionPersistenceServiceImpl(jsonObjectMapper, mockCpsQueryService, mockCpsDataService) def 'Check ongoing cm subscription #scenario'() { given: 'a valid cm subscription query' @@ -64,4 +68,43 @@ class CmNotificationSubscriptionPersistenceServiceImplSpec extends Specification 'datanodes present' | [new DataNode()] || false 'no datanodes present' | [] || true } + + def 'Add new subscriber to an ongoing cm notification subscription'() { + given: 'a valid cm subscription path query' + def cpsPathQuery = objectUnderTest.CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y'); + and: 'a dataNode exists for the given cps path query' + mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions', + cpsPathQuery, FetchDescendantsOption.OMIT_DESCENDANTS) >> [new DataNode(xpath: cpsPathQuery, leaves: ['xpath': '/x/y','subscriptionIds': ['sub-1']])] + when: 'the method to add/update cm notification subscription is called' + objectUnderTest.addOrUpdateCmNotificationSubscription(DatastoreType.PASSTHROUGH_RUNNING, 'ch-1','/x/y', 'newSubId') + then: 'data service method to update list of subscribers is called once' + 1 * mockCpsDataService.updateNodeLeaves( + 'NCMP-Admin', + 'cm-data-subscriptions', + '/datastores/datastore[@name=\'ncmp-datastore:passthrough-running\']/cm-handles/cm-handle[@id=\'ch-1\']/filters', + '{"filter":[{"xpath":"/x/y","subscriptionIds":["sub-1","newSubId"]}]}', _) + } + + def 'Add new cm notification subscription for #datastoreType'() { + given: 'a valid cm subscription path query' + def cpsPathQuery = objectUnderTest.CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted(datastoreName, 'ch-1', '/x/y') + and: 'a parent node xpath for given path above' + def parentNodeXpath = '/datastores/datastore[@name=\'%s\']/cm-handles' + and: 'a datanode does not exist for the given cps path query' + mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions', + cpsPathQuery.formatted(datastoreName), + FetchDescendantsOption.OMIT_DESCENDANTS) >> [] + when: 'the method to add/update cm notification subscription is called' + objectUnderTest.addOrUpdateCmNotificationSubscription(datastoreType, 'ch-1','/x/y', 'newSubId') + then: 'data service method to update list of subscribers is called once with the correct parameters' + 1 * mockCpsDataService.saveData( + 'NCMP-Admin', + 'cm-data-subscriptions', + parentNodeXpath.formatted(datastoreName), + '{"cm-handle":[{"id":"ch-1","filters":{"filter":[{"xpath":"/x/y","subscriptionIds":["newSubId"]}]}}]}', _,_) + where: + scenario | datastoreType || datastoreName + 'passthrough_running' | DatastoreType.PASSTHROUGH_RUNNING || "ncmp-datastore:passthrough-running" + 'passthrough_operational' | DatastoreType.PASSTHROUGH_OPERATIONAL || "ncmp-datastore:passthrough-operational" + } }
\ No newline at end of file diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/NcmpCmNotificationSubscriptionSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/NcmpCmNotificationSubscriptionSpec.groovy new file mode 100644 index 0000000000..df74a05b50 --- /dev/null +++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/NcmpCmNotificationSubscriptionSpec.groovy @@ -0,0 +1,46 @@ +package org.onap.cps.integration.functional + +import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RUNNING; +import org.onap.cps.integration.base.CpsIntegrationSpecBase; +import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionPersistenceService; +import org.springframework.beans.factory.annotation.Autowired; + +class NcmpCmNotificationSubscriptionSpec extends CpsIntegrationSpecBase { + + @Autowired + CmNotificationSubscriptionPersistenceService cmNotificationSubscriptionPersistenceService; + + def 'Adding a new cm notification subscription'() { + given: 'there is no ongoing cm subscription for the following' + def datastoreType = PASSTHROUGH_RUNNING + def cmHandleId = 'ch-1' + def xpath = '/x/y' + assert cmNotificationSubscriptionPersistenceService. + getOngoingCmNotificationSubscriptionIds(datastoreType,cmHandleId,xpath).size() == 0 + when: 'we add a new cm notification subscription' + cmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(datastoreType,cmHandleId,xpath, + 'subId-1') + then: 'there is an ongoing cm subscription for that CM handle and xpath' + assert cmNotificationSubscriptionPersistenceService.isOngoingCmNotificationSubscription(datastoreType,cmHandleId,xpath) + and: 'only one subscription id is related to now ongoing cm subscription' + assert cmNotificationSubscriptionPersistenceService. + getOngoingCmNotificationSubscriptionIds(datastoreType,cmHandleId,xpath).size() == 1 + } + + def 'Adding a cm notification subscription to an already existing'() { + given: 'an ongoing cm subscription' + def datastoreType = PASSTHROUGH_RUNNING + def cmHandleId = 'ch-1' + def xpath = '/x/y' + cmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(datastoreType,cmHandleId,xpath, + 'subId-1') + when: 'a new cm notification subscription is made for the SAME CM handle and xpath' + cmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(datastoreType,cmHandleId,xpath, + 'subId-2') + then: 'it is added to the ongoing list of subscription ids' + def subscriptionIds = cmNotificationSubscriptionPersistenceService.getOngoingCmNotificationSubscriptionIds(datastoreType,cmHandleId,xpath) + assert subscriptionIds.size() == 2 + and: 'both subscription ids exists for the CM handle and xpath' + assert subscriptionIds.contains("subId-1") && subscriptionIds.contains("subId-2") + } +} |