diff options
Diffstat (limited to 'cps-ncmp-service/src')
22 files changed, 506 insertions, 72 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobService.java index 6ff79a9344..255b3847eb 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobService.java @@ -31,20 +31,27 @@ public interface DataJobService { /** * process read data job operations. * - * @param dataJobId Unique identifier of the job within the request + * @param authorization the authorization header from the REST request + * @param dataJobId unique identifier of the job within the request * @param dataJobMetadata data job request headers * @param dataJobReadRequest read data job request */ - void readDataJob(String dataJobId, DataJobMetadata dataJobMetadata, DataJobReadRequest dataJobReadRequest); + void readDataJob(String authorization, + String dataJobId, + DataJobMetadata dataJobMetadata, + DataJobReadRequest dataJobReadRequest); /** * process write data job operations. * - * @param dataJobId Unique identifier of the job within the request + * @param authorization the authorization header from the REST request + * @param dataJobId unique identifier of the job within the request * @param dataJobMetadata data job request headers * @param dataJobWriteRequest write data job request * @return a list of sub-job write responses */ - List<SubJobWriteResponse> writeDataJob(String dataJobId, DataJobMetadata dataJobMetadata, + List<SubJobWriteResponse> writeDataJob(String authorization, + String dataJobId, + DataJobMetadata dataJobMetadata, DataJobWriteRequest dataJobWriteRequest); }
\ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java new file mode 100644 index 0000000000..50d96f858c --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java @@ -0,0 +1,45 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.datajobs; + +/** + * Service interface to check the status of a data job. + * The operations interact with a DMI Plugin to retrieve data job statuses. + */ +public interface DataJobStatusService { + + /** + * Retrieves the current status of a specific data job. + * + * @param authorization The authorization header from the REST request. + * @param dmiServiceName The name of the DMI Service relevant to the data job. + * @param requestId The unique identifier for the overall data job request. + * @param dataProducerJobId The identifier of the data producer job within the DMI system. + * @param dataProducerId The ID of the producer registered by DMI, used for operations related to this request. + * This could include alternate IDs or specific identifiers. + * @return The current status of the data job as a String. + */ + String getDataJobStatus(final String authorization, + final String dmiServiceName, + final String requestId, + final String dataProducerJobId, + final String dataProducerId); +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandler.java index c5052f1405..b5ab7f6525 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandler.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandler.java @@ -50,7 +50,7 @@ public class DmiCacheHandler { private final InventoryPersistence inventoryPersistence; /** - * Adds new subscription to the subscription cache. + * Adds subscription to the subscription cache. * * @param subscriptionId subscription id * @param predicates subscription request predicates @@ -60,6 +60,18 @@ public class DmiCacheHandler { } /** + * Adds subscription to the subscription cache. + * + * @param subscriptionId subscription id + * @param dmiCmSubscriptionDetailsPerDmi map of dmi cm notification subscription details per dmi + */ + public void add(final String subscriptionId, + final Map<String, DmiCmSubscriptionDetails> + dmiCmSubscriptionDetailsPerDmi) { + cmNotificationSubscriptionCache.put(subscriptionId, dmiCmSubscriptionDetailsPerDmi); + } + + /** * Get cm notification subscription cache entry via subscription id. * * @param subscriptionId subscription id @@ -122,8 +134,8 @@ public class DmiCacheHandler { * @param status String of status * */ - public void updateDmiSubscriptionStatusPerDmi(final String subscriptionId, final String dmiServiceName, - final CmSubscriptionStatus status) { + public void updateDmiSubscriptionStatus(final String subscriptionId, final String dmiServiceName, + final CmSubscriptionStatus status) { final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi = cmNotificationSubscriptionCache.get(subscriptionId); dmiSubscriptionsPerDmi.get(dmiServiceName).setCmSubscriptionStatus(status); @@ -162,7 +174,7 @@ public class DmiCacheHandler { * @param dmiServiceName String of dmiServiceName * */ - public void removeFromDatabasePerDmi(final String subscriptionId, final String dmiServiceName) { + public void removeFromDatabase(final String subscriptionId, final String dmiServiceName) { final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates = cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName) .getDmiCmSubscriptionPredicates(); @@ -210,6 +222,6 @@ public class DmiCacheHandler { private boolean isAcceptedOrRejected(final DmiCmSubscriptionDetails dmiCmSubscription) { return dmiCmSubscription.getCmSubscriptionStatus().toString().equals("ACCEPTED") - || dmiCmSubscription.getCmSubscriptionStatus().toString().equals("REJECTED"); + || dmiCmSubscription.getCmSubscriptionStatus().toString().equals("REJECTED"); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java index 20ccf528ed..20c7c7b13d 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java @@ -80,7 +80,7 @@ public class DmiOutEventConsumer { dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName); } if (eventType.equals("subscriptionDeleteResponse")) { - dmiCacheHandler.removeFromDatabasePerDmi(subscriptionId, dmiPluginName); + dmiCacheHandler.removeFromDatabase(subscriptionId, dmiPluginName); } handleEventsStatusPerDmi(subscriptionId, eventType); } @@ -96,7 +96,7 @@ public class DmiOutEventConsumer { private void handleCacheStatusPerDmi(final String subscriptionId, final String dmiPluginName, final CmSubscriptionStatus cmSubscriptionStatus) { - dmiCacheHandler.updateDmiSubscriptionStatusPerDmi(subscriptionId, dmiPluginName, + dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiPluginName, cmSubscriptionStatus); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionTuple.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionTuple.java new file mode 100644 index 0000000000..cd4a15af47 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionTuple.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.cmnotificationsubscription.models; + +import java.util.Collection; +import java.util.Map; + +/** + * Tuple to be used during for to delete usecase. + * + * @param lastRemainingSubscriptionsPerDmi subscriptions that are used by only one subscriber grouped per dmi + * @param overlappingSubscriptionsPerDmi subscriptions that are shared by multiple subscribers grouped per dmi + */ +public record DmiCmSubscriptionTuple(Map<String, Collection<DmiCmSubscriptionKey>> lastRemainingSubscriptionsPerDmi, + Map<String, Collection<DmiCmSubscriptionKey>> overlappingSubscriptionsPerDmi) { +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandler.java index 3a9b2066b2..90c5c575e6 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandler.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandler.java @@ -37,8 +37,7 @@ public interface CmSubscriptionHandler { * Process cm notification subscription delete request. * * @param subscriptionId subscription id - * @param predicates subscription predicates */ - void processSubscriptionDeleteRequest(final String subscriptionId, final List<Predicate> predicates); + void processSubscriptionDeleteRequest(final String subscriptionId); }
\ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java index 9d33d25816..c2c71dbaae 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java @@ -21,34 +21,50 @@ package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiCmSubscriptionDetailsPerDmiMapper; import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventMapper; import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventProducer; import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus; import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionKey; import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionTuple; import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService; import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate; import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent; import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent; +import org.onap.cps.ncmp.impl.inventory.InventoryPersistence; +import org.onap.cps.spi.model.DataNode; import org.springframework.stereotype.Service; @Service @RequiredArgsConstructor public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler { + private static final Pattern SUBSCRIPTION_KEY_FROM_XPATH_PATTERN = Pattern.compile( + "^/datastores/datastore\\[@name='([^']*)']/cm-handles/cm-handle\\[@id='([^']*)']/" + + "filters/filter\\[@xpath='(.*)']$"); + private final CmSubscriptionPersistenceService cmSubscriptionPersistenceService; private final CmSubscriptionComparator cmSubscriptionComparator; private final NcmpOutEventMapper ncmpOutEventMapper; private final DmiInEventMapper dmiInEventMapper; + private final DmiCmSubscriptionDetailsPerDmiMapper dmiCmSubscriptionDetailsPerDmiMapper; private final NcmpOutEventProducer ncmpOutEventProducer; private final DmiInEventProducer dmiInEventProducer; private final DmiCacheHandler dmiCacheHandler; + private final InventoryPersistence inventoryPersistence; @Override public void processSubscriptionCreateRequest(final String subscriptionId, final List<Predicate> predicates) { @@ -62,10 +78,45 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler { } @Override - public void processSubscriptionDeleteRequest(final String subscriptionId, final List<Predicate> predicates) { - dmiCacheHandler.add(subscriptionId, predicates); - sendSubscriptionDeleteRequestToDmi(subscriptionId); - scheduleNcmpOutEventResponse(subscriptionId, "subscriptionDeleteResponse"); + public void processSubscriptionDeleteRequest(final String subscriptionId) { + final Collection<DataNode> subscriptionDataNodes = + cmSubscriptionPersistenceService.getAllNodesForSubscriptionId(subscriptionId); + final DmiCmSubscriptionTuple dmiCmSubscriptionTuple = + getLastRemainingAndOverlappingSubscriptionsPerDmi(subscriptionDataNodes); + dmiCacheHandler.add(subscriptionId, mergeDmiCmSubscriptionDetailsPerDmiMaps(dmiCmSubscriptionTuple)); + if (dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi().isEmpty()) { + acceptAndPublishDeleteRequest(subscriptionId); + } else { + sendSubscriptionDeleteRequestToDmi(subscriptionId, + dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi( + dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi())); + scheduleNcmpOutEventResponse(subscriptionId, "subscriptionDeleteResponse"); + } + } + + private Map<String, DmiCmSubscriptionDetails> mergeDmiCmSubscriptionDetailsPerDmiMaps( + final DmiCmSubscriptionTuple dmiCmSubscriptionTuple) { + final Map<String, DmiCmSubscriptionDetails> lastRemainingDmiSubscriptionsPerDmi = + dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi( + dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi()); + final Map<String, DmiCmSubscriptionDetails> overlappingDmiSubscriptionsPerDmi = + dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi( + dmiCmSubscriptionTuple.overlappingSubscriptionsPerDmi()); + final Map<String, DmiCmSubscriptionDetails> mergedDmiSubscriptionsPerDmi = + new HashMap<>(lastRemainingDmiSubscriptionsPerDmi); + overlappingDmiSubscriptionsPerDmi.forEach((dmiServiceName, dmiCmSubscriptionDetails) -> + mergedDmiSubscriptionsPerDmi.merge(dmiServiceName, dmiCmSubscriptionDetails, + this::mergeDmiCmSubscriptionDetails)); + return mergedDmiSubscriptionsPerDmi; + } + + private DmiCmSubscriptionDetails mergeDmiCmSubscriptionDetails( + final DmiCmSubscriptionDetails dmiCmSubscriptionDetails, + final DmiCmSubscriptionDetails otherDmiCmSubscriptionDetails) { + final List<DmiCmSubscriptionPredicate> mergedDmiCmSubscriptionPredicates = + new ArrayList<>(dmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates()); + mergedDmiCmSubscriptionPredicates.addAll(otherDmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates()); + return new DmiCmSubscriptionDetails(mergedDmiCmSubscriptionPredicates, CmSubscriptionStatus.PENDING); } private void scheduleNcmpOutEventResponse(final String subscriptionId, final String eventType) { @@ -81,6 +132,19 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler { ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, "subscriptionCreateResponse", ncmpOutEvent, false); } + private void acceptAndPublishDeleteRequest(final String subscriptionId) { + final Set<String> dmiServiceNames = dmiCacheHandler.get(subscriptionId).keySet(); + for (final String dmiServiceName : dmiServiceNames) { + dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiServiceName, + CmSubscriptionStatus.ACCEPTED); + dmiCacheHandler.removeFromDatabase(subscriptionId, dmiServiceName); + } + final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId, + dmiCacheHandler.get(subscriptionId)); + ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, "subscriptionDeleteResponse", ncmpOutEvent, + false); + } + private void handleNewCmSubscription(final String subscriptionId) { final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi = dmiCacheHandler.get(subscriptionId); @@ -98,26 +162,68 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler { } private void publishDmiInEventPerDmi(final String subscriptionId, final String dmiPluginName, - final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) { + final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) { final DmiInEvent dmiInEvent = dmiInEventMapper.toDmiInEvent(dmiCmSubscriptionPredicates); dmiInEventProducer.publishDmiInEvent(subscriptionId, dmiPluginName, "subscriptionCreateRequest", dmiInEvent); } private void acceptAndPublishNcmpOutEventPerDmi(final String subscriptionId, final String dmiPluginName) { - dmiCacheHandler.updateDmiSubscriptionStatusPerDmi(subscriptionId, dmiPluginName, + dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiPluginName, CmSubscriptionStatus.ACCEPTED); dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName); } - private void sendSubscriptionDeleteRequestToDmi(final String subscriptionId) { - final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi = - dmiCacheHandler.get(subscriptionId); - dmiSubscriptionsPerDmi.forEach((dmiPluginName, dmiSubscriptionDetails) -> { - final DmiInEvent dmiInEvent = dmiInEventMapper.toDmiInEvent( - dmiSubscriptionDetails.getDmiCmSubscriptionPredicates()); - dmiInEventProducer.publishDmiInEvent(subscriptionId, dmiPluginName, - "subscriptionDeleteRequest", dmiInEvent); + private void sendSubscriptionDeleteRequestToDmi(final String subscriptionId, + final Map<String, DmiCmSubscriptionDetails> + dmiCmSubscriptionsPerDmi) { + dmiCmSubscriptionsPerDmi.forEach((dmiPluginName, dmiCmSubscriptionDetails) -> { + final DmiInEvent dmiInEvent = + dmiInEventMapper.toDmiInEvent( + dmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates()); + dmiInEventProducer.publishDmiInEvent(subscriptionId, + dmiPluginName, "subscriptionDeleteRequest", dmiInEvent); }); } + + + private DmiCmSubscriptionTuple getLastRemainingAndOverlappingSubscriptionsPerDmi( + final Collection<DataNode> subscriptionNodes) { + final Map<String, Collection<DmiCmSubscriptionKey>> lastRemainingSubscriptionsPerDmi = new HashMap<>(); + final Map<String, Collection<DmiCmSubscriptionKey>> overlappingSubscriptionsPerDmi = new HashMap<>(); + + for (final DataNode subscriptionNode : subscriptionNodes) { + final DmiCmSubscriptionKey dmiCmSubscriptionKey = extractCmSubscriptionKey(subscriptionNode.getXpath()); + final String dmiServiceName = inventoryPersistence.getYangModelCmHandle( + dmiCmSubscriptionKey.cmHandleId()).getDmiServiceName(); + final List<String> subscribers = (List<String>) subscriptionNode.getLeaves().get("subscriptionIds"); + populateDmiCmSubscriptionTuple(subscribers, overlappingSubscriptionsPerDmi, + lastRemainingSubscriptionsPerDmi, dmiServiceName, dmiCmSubscriptionKey); + } + return new DmiCmSubscriptionTuple(lastRemainingSubscriptionsPerDmi, overlappingSubscriptionsPerDmi); + } + + private static void populateDmiCmSubscriptionTuple(final List<String> subscribers, + final Map<String, Collection<DmiCmSubscriptionKey>> + overlappingSubscriptionsPerDmi, + final Map<String, Collection<DmiCmSubscriptionKey>> + lastRemainingSubscriptionsPerDmi, + final String dmiServiceName, + final DmiCmSubscriptionKey dmiCmSubscriptionKey) { + final Map<String, Collection<DmiCmSubscriptionKey>> targetMap = + subscribers.size() > 1 ? overlappingSubscriptionsPerDmi : lastRemainingSubscriptionsPerDmi; + targetMap.computeIfAbsent(dmiServiceName, dmiName -> new HashSet<>()).add(dmiCmSubscriptionKey); + } + + private DmiCmSubscriptionKey extractCmSubscriptionKey(final String xpath) { + final Matcher matcher = SUBSCRIPTION_KEY_FROM_XPATH_PATTERN.matcher(xpath); + if (matcher.find()) { + final String datastoreName = matcher.group(1); + final String cmHandleId = matcher.group(2); + final String filterXpath = matcher.group(3); + return new DmiCmSubscriptionKey(datastoreName, cmHandleId, filterXpath); + } + throw new IllegalArgumentException("DataNode xpath does not represent a subscription key"); + } + }
\ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java index 1e1359dd0d..cba64e0e94 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java @@ -63,7 +63,7 @@ public class NcmpInEventConsumer { if ("subscriptionDeleteRequest".equals(cloudEvent.getType())) { log.info("Subscription delete request for source {} with subscription id {} ...", cloudEvent.getSource(), subscriptionId); - cmSubscriptionHandler.processSubscriptionDeleteRequest(subscriptionId, predicates); + cmSubscriptionHandler.processSubscriptionDeleteRequest(subscriptionId); } } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java index c24507a1a7..c71109013a 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java @@ -159,6 +159,18 @@ public class CmSubscriptionPersistenceService { } } + /** + * Retrieve all existing dataNodes for given subscription id. + * + * @param subscriptionId subscription id + * @return collection of DataNodes + */ + public Collection<DataNode> getAllNodesForSubscriptionId(final String subscriptionId) { + return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, + CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID.formatted(subscriptionId), + OMIT_DESCENDANTS); + } + private void deleteListOfSubscriptionsFor(final DatastoreType datastoreType, final String cmHandleId, final String xpath) { cpsDataService.deleteDataNode(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobServiceImpl.java index 56ed6e30da..04c3ad2fc6 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobServiceImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobServiceImpl.java @@ -42,19 +42,26 @@ public class DataJobServiceImpl implements DataJobService { private final WriteRequestExaminer writeRequestExaminer; @Override - public void readDataJob(final String dataJobId, final DataJobMetadata dataJobMetadata, + public void readDataJob(final String authorization, + final String dataJobId, + final DataJobMetadata dataJobMetadata, final DataJobReadRequest dataJobReadRequest) { log.info("data job id for read operation is: {}", dataJobId); } @Override - public List<SubJobWriteResponse> writeDataJob(final String dataJobId, final DataJobMetadata dataJobMetadata, + public List<SubJobWriteResponse> writeDataJob(final String authorization, + final String dataJobId, + final DataJobMetadata dataJobMetadata, final DataJobWriteRequest dataJobWriteRequest) { log.info("data job id for write operation is: {}", dataJobId); final Map<ProducerKey, List<DmiWriteOperation>> dmiWriteOperationsPerProducerKey = writeRequestExaminer.splitDmiWriteOperationsFromRequest(dataJobId, dataJobWriteRequest); - return dmiSubJobClient.sendRequestsToDmi(dataJobId, dataJobMetadata, dmiWriteOperationsPerProducerKey); + return dmiSubJobClient.sendRequestsToDmi(authorization, + dataJobId, + dataJobMetadata, + dmiWriteOperationsPerProducerKey); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java new file mode 100644 index 0000000000..a6ecaa1097 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java @@ -0,0 +1,67 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.datajobs; + +import lombok.RequiredArgsConstructor; +import org.onap.cps.ncmp.api.datajobs.DataJobStatusService; +import org.onap.cps.ncmp.impl.dmi.DmiProperties; +import org.onap.cps.ncmp.impl.dmi.DmiRestClient; +import org.onap.cps.ncmp.impl.dmi.DmiServiceUrlTemplateBuilder; +import org.onap.cps.ncmp.impl.dmi.UrlTemplateParameters; +import org.springframework.stereotype.Service; + +/** + * Implementation of {@link DataJobStatusService} interface. + * The operations interact with a DMI Plugin to retrieve data job statuses. + */ +@Service +@RequiredArgsConstructor +public class DataJobStatusServiceImpl implements DataJobStatusService { + + private final DmiRestClient dmiRestClient; + private final DmiProperties dmiProperties; + + @Override + public String getDataJobStatus(final String authorization, + final String dmiServiceName, + final String requestId, + final String dataProducerJobId, + final String dataProducerId) { + + final UrlTemplateParameters urlTemplateParameters = buildUrlParameters(dmiServiceName, requestId, + dataProducerJobId, dataProducerId); + return dmiRestClient.getDataJobStatus(urlTemplateParameters, authorization).block(); + } + + private UrlTemplateParameters buildUrlParameters(final String dmiServiceName, + final String requestId, + final String dataProducerJobId, + final String dataProducerId) { + return DmiServiceUrlTemplateBuilder.newInstance() + .fixedPathSegment("dataJob") + .variablePathSegment("requestId", requestId) + .fixedPathSegment("dataProducerJob") + .variablePathSegment("dataProducerJobId", dataProducerJobId) + .fixedPathSegment("status") + .queryParameter("dataProducerId", dataProducerId) + .createUrlTemplateParameters(dmiServiceName, dmiProperties.getDmiBasePath()); + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java index 1624ce8ae6..c93709ce75 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java @@ -48,17 +48,19 @@ public class DmiSubJobRequestHandler { private final DmiRestClient dmiRestClient; private final DmiProperties dmiProperties; private final JsonObjectMapper jsonObjectMapper; - static final String NO_AUTH_HEADER = null; /** * Sends sub-job write requests to the DMI Plugin. * - * @param dataJobId data ojb identifier + * @param authorization the authorization header from the REST request + * @param dataJobId data job identifier * @param dataJobMetadata the data job's metadata - * @param dmiWriteOperationsPerProducerKey a collection of write requests per producer key. + * @param dmiWriteOperationsPerProducerKey a collection of write requests per producer key * @return a list of sub-job write responses */ - public List<SubJobWriteResponse> sendRequestsToDmi(final String dataJobId, final DataJobMetadata dataJobMetadata, + public List<SubJobWriteResponse> sendRequestsToDmi(final String authorization, + final String dataJobId, + final DataJobMetadata dataJobMetadata, final Map<ProducerKey, List<DmiWriteOperation>> dmiWriteOperationsPerProducerKey) { final List<SubJobWriteResponse> subJobWriteResponses = new ArrayList<>(dmiWriteOperationsPerProducerKey.size()); dmiWriteOperationsPerProducerKey.forEach((producerKey, dmi3ggpWriteOperations) -> { @@ -71,7 +73,7 @@ public class DmiSubJobRequestHandler { urlTemplateParameters, jsonObjectMapper.asJsonString(subJobWriteRequest), OperationType.CREATE, - NO_AUTH_HEADER); + authorization); final SubJobWriteResponse subJobWriteResponse = jsonObjectMapper .convertToValueType(responseEntity.getBody(), SubJobWriteResponse.class); log.debug("Sub job write response: {}", subJobWriteResponse); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java index 7ac85cbf84..ba6bba9c53 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java @@ -144,6 +144,26 @@ public class DmiRestClient { .defaultIfEmpty(NOT_SPECIFIED); } + /** + * Retrieves the status of a data job from the DMI service. + * + * @param urlTemplateParameters The URL template parameters for the DMI data job status endpoint. + * @param authorization The authorization token to be added to the request headers. + * @return A Mono emitting the status of the data job as a String. + * @throws DmiClientRequestException If there is an error during the DMI request. + */ + public Mono<String> getDataJobStatus(final UrlTemplateParameters urlTemplateParameters, + final String authorization) { + + return dataServicesWebClient.get() + .uri(urlTemplateParameters.urlTemplate(), urlTemplateParameters.urlVariables()) + .headers(httpHeaders -> configureHttpHeaders(httpHeaders, authorization)) + .retrieve() + .bodyToMono(JsonNode.class) + .map(responseHealthStatus -> responseHealthStatus.path("status").asText()) + .onErrorMap(throwable -> handleDmiClientException(throwable, OperationType.READ.getOperationName())); + } + private WebClient getWebClient(final RequiredDmiService requiredDmiService) { return requiredDmiService.equals(RequiredDmiService.DATA) ? dataServicesWebClient : modelServicesWebClient; } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandlerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandlerSpec.groovy index 2d50e770dd..791a154608 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandlerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandlerSpec.groovy @@ -65,17 +65,34 @@ class DmiCacheHandlerSpec extends MessagingBaseSpec { initialiseMockInventoryPersistenceResponses() } - def 'Load CM subscription event to cache'() { - given: 'a valid subscription event with Id' + def 'Load CM subscription event to cache with predicates'() { + given: 'a subscription event with id' def subscriptionId = ncmpInEvent.getData().getSubscriptionId() and: 'list of predicates' def predicates = ncmpInEvent.getData().getPredicates() - when: 'a valid event object loaded in cache' + when: 'subscription is loaded to cache with predicates' objectUnderTest.add(subscriptionId, predicates) - then: 'the cache contains the correct entry with #subscriptionId subscription ID' + then: 'the number of entries in cache is correct' + assert testCache.size() == 1 + and: 'the cache contains the correct entries' assert testCache.containsKey(subscriptionId) } + def 'Load CM subscription event to cache with dmi subscription details per dmi'() { + given: 'a subscription event with id' + def subscriptionId = ncmpInEvent.getData().getSubscriptionId() + and: 'dmi subscription details per dmi' + def dmiSubscriptionsPerDmi = [:] + when: 'subscription is loaded to cache with dmi subscription details per dmi' + objectUnderTest.add(subscriptionId, dmiSubscriptionsPerDmi) + then: 'the number of entries in cache is correct' + assert testCache.size() == 1 + and: 'the cache contains the correct entries' + assert testCache.containsKey(subscriptionId) + and: 'the entry for the subscription ID matches the provided DMI subscription details' + assert testCache.get(subscriptionId) == dmiSubscriptionsPerDmi + } + def 'Get cache entry via subscription id'() { given: 'the cache contains value for some-id' testCache.put('some-id',[:]) @@ -157,7 +174,7 @@ class DmiCacheHandlerSpec extends MessagingBaseSpec { def subscriptionId = ncmpInEvent.getData().getSubscriptionId() objectUnderTest.add(subscriptionId, predicates) when: 'subscription status per dmi is updated in cache' - objectUnderTest.updateDmiSubscriptionStatusPerDmi(subscriptionId,'dmi-1', CmSubscriptionStatus.ACCEPTED) + objectUnderTest.updateDmiSubscriptionStatus(subscriptionId,'dmi-1', CmSubscriptionStatus.ACCEPTED) then: 'verify status has been updated in cache' def predicate = testCache.get(subscriptionId) assert predicate.get('dmi-1').cmSubscriptionStatus == CmSubscriptionStatus.ACCEPTED @@ -180,7 +197,7 @@ class DmiCacheHandlerSpec extends MessagingBaseSpec { def subscriptionId = ncmpInEvent.getData().getSubscriptionId() objectUnderTest.add(subscriptionId, predicates) when: 'subscription is persisted in database' - objectUnderTest.removeFromDatabasePerDmi(subscriptionId,'dmi-1') + objectUnderTest.removeFromDatabase(subscriptionId,'dmi-1') then: 'persistence service is called the correct number of times per dmi' 4 * mockCmSubscriptionPersistenceService.removeCmSubscription(_,_,_,subscriptionId) } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumerSpec.groovy index 6e28d14810..bcf8780873 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumerSpec.groovy @@ -103,7 +103,7 @@ class DmiOutEventConsumerSpec extends MessagingBaseSpec { when: 'the event is consumed' objectUnderTest.consumeDmiOutEvent(consumerRecord) then: 'correct number of calls to cache' - expectedCacheCalls * mockDmiCacheHandler.updateDmiSubscriptionStatusPerDmi('sub-1','test-dmi-plugin-name', subscriptionStatus) + expectedCacheCalls * mockDmiCacheHandler.updateDmiSubscriptionStatus('sub-1','test-dmi-plugin-name', subscriptionStatus) and: 'correct number of calls to persist cache' expectedPersistenceCalls * mockDmiCacheHandler.persistIntoDatabasePerDmi('sub-1','test-dmi-plugin-name') and: 'correct number of calls to map the ncmp out event' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImplSpec.groovy index 3f6556d47e..f902c60482 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImplSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImplSpec.groovy @@ -22,6 +22,7 @@ package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp import com.fasterxml.jackson.databind.ObjectMapper import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler +import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiCmSubscriptionDetailsPerDmiMapper import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventMapper import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventProducer import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails @@ -30,7 +31,10 @@ import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPer import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent +import org.onap.cps.ncmp.impl.inventory.InventoryPersistence +import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle import org.onap.cps.ncmp.utils.TestUtils +import org.onap.cps.spi.model.DataNode import org.onap.cps.utils.JsonObjectMapper import spock.lang.Specification @@ -45,13 +49,15 @@ class CmSubscriptionHandlerImplSpec extends Specification { def mockCmSubscriptionComparator = Mock(CmSubscriptionComparator) def mockNcmpOutEventMapper = Mock(NcmpOutEventMapper) def mockDmiInEventMapper = Mock(DmiInEventMapper) + def dmiCmSubscriptionDetailsPerDmiMapper = new DmiCmSubscriptionDetailsPerDmiMapper() def mockNcmpOutEventProducer = Mock(NcmpOutEventProducer) def mockDmiInEventProducer = Mock(DmiInEventProducer) def mockDmiCacheHandler = Mock(DmiCacheHandler) + def mockInventoryPersistence = Mock(InventoryPersistence) def objectUnderTest = new CmSubscriptionHandlerImpl(mockCmSubscriptionPersistenceService, - mockCmSubscriptionComparator, mockNcmpOutEventMapper, mockDmiInEventMapper, - mockNcmpOutEventProducer, mockDmiInEventProducer, mockDmiCacheHandler) + mockCmSubscriptionComparator, mockNcmpOutEventMapper, mockDmiInEventMapper, dmiCmSubscriptionDetailsPerDmiMapper, + mockNcmpOutEventProducer, mockDmiInEventProducer, mockDmiCacheHandler, mockInventoryPersistence) def testDmiSubscriptionsPerDmi = ["dmi-1": new DmiCmSubscriptionDetails([], PENDING)] @@ -97,7 +103,7 @@ class CmSubscriptionHandlerImplSpec extends Specification { then: 'the subscription cache handler is called once' 1 * mockDmiCacheHandler.add('test-id', _) and: 'the subscription details are updated in the cache' - 1 * mockDmiCacheHandler.updateDmiSubscriptionStatusPerDmi('test-id', _, ACCEPTED) + 1 * mockDmiCacheHandler.updateDmiSubscriptionStatus('test-id', _, ACCEPTED) and: 'we schedule to send the response after configured time from the cache' 1 * mockNcmpOutEventProducer.publishNcmpOutEvent('test-id', 'subscriptionCreateResponse', null, true) } @@ -123,24 +129,44 @@ class CmSubscriptionHandlerImplSpec extends Specification { } def 'Consume valid CmNotificationSubscriptionNcmpInEvent delete message'() { - given: 'a cmNotificationSubscriptionNcmp in event for delete' - def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json') - def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class) - and: 'relevant details is extracted from the event' - def subscriptionId = testEventConsumed.getData().getSubscriptionId() - def predicates = testEventConsumed.getData().getPredicates() - and: 'the cache handler returns for relevant subscription id' - 1 * mockDmiCacheHandler.get('test-id') >> testDmiSubscriptionsPerDmi - when: 'the valid and unique event is consumed' - objectUnderTest.processSubscriptionDeleteRequest(subscriptionId, predicates) - then: 'the subscription cache handler is called once' - 1 * mockDmiCacheHandler.add('test-id', predicates) - and: 'the mapper handler to get DMI in event is called once' - 1 * mockDmiInEventMapper.toDmiInEvent(_) - and: 'the events handler method to publish DMI event is called correct number of times with the correct parameters' - testDmiSubscriptionsPerDmi.size() * mockDmiInEventProducer.publishDmiInEvent( - 'test-id', 'dmi-1', 'subscriptionDeleteRequest', _) - and: 'we schedule to send the response after configured time from the cache' - 1 * mockNcmpOutEventProducer.publishNcmpOutEvent('test-id', 'subscriptionDeleteResponse', null, true) + given: 'a test subscription id' + def subscriptionId = 'test-id' + and: 'the persistence service returns datanodes' + 1 * mockCmSubscriptionPersistenceService.getAllNodesForSubscriptionId(subscriptionId) >> + [new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-1']/filters/filter[@xpath='x/y']", leaves: ['xpath': 'x/y', 'subscriptionIds': ['test-id']]), + new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-2']/filters/filter[@xpath='y/z']", leaves: ['xpath': 'y/z', 'subscriptionIds': ['test-id']])] + and: 'the inventory persistence returns yang model cm handles' + 1 * mockInventoryPersistence.getYangModelCmHandle('ch-1') >> new YangModelCmHandle(dmiServiceName: 'dmi-1') + 1 * mockInventoryPersistence.getYangModelCmHandle('ch-2') >> new YangModelCmHandle(dmiServiceName: 'dmi-2') + when: 'the subscription delete request is processed' + objectUnderTest.processSubscriptionDeleteRequest(subscriptionId) + then: 'the method to publish a dmi event is called with correct parameters' + 1 * mockDmiInEventProducer.publishDmiInEvent(subscriptionId,'dmi-1','subscriptionDeleteRequest',_) + 1 * mockDmiInEventProducer.publishDmiInEvent(subscriptionId,'dmi-2','subscriptionDeleteRequest',_) + and: 'the method to publish nmcp out event is called with correct parameters' + 1 * mockNcmpOutEventProducer.publishNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, true) + } + + def 'Delete a subscriber for fully overlapping subscriptions'() { + given: 'a test subscription id' + def subscriptionId = 'test-id' + and: 'the persistence service returns datanodes with multiple subscribers' + 1 * mockCmSubscriptionPersistenceService.getAllNodesForSubscriptionId(subscriptionId) >> + [new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-1']/filters/filter[@xpath='x/y']", leaves: ['xpath': 'x/y', 'subscriptionIds': ['test-id','other-id']]), + new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-2']/filters/filter[@xpath='y/z']", leaves: ['xpath': 'y/z', 'subscriptionIds': ['test-id','other-id']])] + and: 'the inventory persistence returns yang model cm handles' + 1 * mockInventoryPersistence.getYangModelCmHandle('ch-1') >> new YangModelCmHandle(dmiServiceName: 'dmi-1') + 1 * mockInventoryPersistence.getYangModelCmHandle('ch-2') >> new YangModelCmHandle(dmiServiceName: 'dmi-2') + and: 'the cache handler returns the relevant maps whenever called' + 2 * mockDmiCacheHandler.get(subscriptionId) >> ['dmi-1':[:],'dmi-2':[:]] + when: 'the subscription delete request is processed' + objectUnderTest.processSubscriptionDeleteRequest(subscriptionId) + then: 'the method to publish a dmi event is never called' + 0 * mockDmiInEventProducer.publishDmiInEvent(_,_,_,_) + and: 'the cache handler is called to remove subscriber from database per dmi' + 1 * mockDmiCacheHandler.removeFromDatabase('test-id', 'dmi-1') + 1 * mockDmiCacheHandler.removeFromDatabase('test-id', 'dmi-2') + and: 'the method to publish nmcp out event is called with correct parameters' + 1 * mockNcmpOutEventProducer.publishNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, false) } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumerSpec.groovy index 2881737e82..9c24e2b005 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumerSpec.groovy @@ -100,7 +100,7 @@ class NcmpInEventConsumerSpec extends MessagingBaseSpec { and: 'the log indicates the task completed successfully' assert loggingEvent.formattedMessage == 'Subscription delete request for source some-resource with subscription id test-id ...' and: 'the subscription handler service is called once' - 1 * mockCmSubscriptionHandler.processSubscriptionDeleteRequest('test-id',_) + 1 * mockCmSubscriptionHandler.processSubscriptionDeleteRequest('test-id') } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceServiceSpec.groovy index d32d143ade..354e2af937 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceServiceSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceServiceSpec.groovy @@ -188,4 +188,17 @@ class CmSubscriptionPersistenceServiceSpec extends Specification { 'cm handle in same datastore is NOT used for other subscriptions' | [] || 1 } + def 'Get all nodes for subscription id'() { + given: 'the query service returns nodes for subscription id' + def expectedDataNode = new DataNode(xpath: '/some/xpath') + def queryServiceResponse = [expectedDataNode].asCollection() + 1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions', '//filter/subscriptionIds[text()=\'some-id\']', OMIT_DESCENDANTS) >> queryServiceResponse + when: 'retrieving all nodes for subscription id' + def result = objectUnderTest.getAllNodesForSubscriptionId('some-id') + then: 'the result returns correct number of datanodes' + assert result.size() == 1 + and: 'the attribute of the datanode is as expected' + assert result.iterator().next().xpath == expectedDataNode.xpath + } + } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobServiceImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobServiceImplSpec.groovy index 94c490ab07..4b536b9710 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobServiceImplSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobServiceImplSpec.groovy @@ -40,6 +40,7 @@ class DataJobServiceImplSpec extends Specification { def objectUnderTest = new DataJobServiceImpl(mockDmiSubJobRequestHandler, mockWriteRequestExaminer) def myDataJobMetadata = new DataJobMetadata('', '', '') + def authorization = 'my authorization header' def logger = Spy(ListAppender<ILoggingEvent>) @@ -54,7 +55,7 @@ class DataJobServiceImplSpec extends Specification { def 'Read data job request.'() { when: 'read data job request is processed' def readOperation = new ReadOperation('', '', '', [], [], '', '', 1) - objectUnderTest.readDataJob('my-job-id', myDataJobMetadata, new DataJobReadRequest([readOperation])) + objectUnderTest.readDataJob(authorization, 'my-job-id', myDataJobMetadata, new DataJobReadRequest([readOperation])) then: 'the data job id is correctly logged' def loggingEvent = logger.list[0] assert loggingEvent.level == Level.INFO @@ -67,11 +68,11 @@ class DataJobServiceImplSpec extends Specification { and: 'a map of producer key and dmi 3gpp write operation' def dmiWriteOperationsPerProducerKey = [:] when: 'write data job request is processed' - objectUnderTest.writeDataJob('my-job-id', myDataJobMetadata, dataJobWriteRequest) + objectUnderTest.writeDataJob(authorization, 'my-job-id', myDataJobMetadata, dataJobWriteRequest) then: 'the examiner service is called and a map is returned' 1 * mockWriteRequestExaminer.splitDmiWriteOperationsFromRequest('my-job-id', dataJobWriteRequest) >> dmiWriteOperationsPerProducerKey and: 'the dmi request handler is called with the result from the examiner' - 1 * mockDmiSubJobRequestHandler.sendRequestsToDmi('my-job-id', myDataJobMetadata, dmiWriteOperationsPerProducerKey) + 1 * mockDmiSubJobRequestHandler.sendRequestsToDmi(authorization, 'my-job-id', myDataJobMetadata, dmiWriteOperationsPerProducerKey) } def setupLogger() { diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImplSpec.groovy new file mode 100644 index 0000000000..cc042988f6 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImplSpec.groovy @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.datajobs + +import org.onap.cps.ncmp.impl.dmi.DmiProperties +import org.onap.cps.ncmp.impl.dmi.DmiRestClient +import org.onap.cps.ncmp.impl.dmi.UrlTemplateParameters +import reactor.core.publisher.Mono +import spock.lang.Specification + +class DataJobStatusServiceImplSpec extends Specification { + + def mockDmiRestClient = Mock(DmiRestClient) + def mockDmiProperties = Mock(DmiProperties) + def objectUnderTest = new DataJobStatusServiceImpl(mockDmiRestClient, mockDmiProperties) + + def setup() { + mockDmiProperties.dmiBasePath >> 'dmi' + } + + def 'Forward a data job status query to DMI.' () { + given: 'the required parameters for querying' + def dmiServiceName = 'some-dmi-service' + def requestId = 'some-request-id' + def dataProducerJobId = 'some-data-producer-job-id' + def dataJobId = 'some-data-job-id' + def authorization = 'my authorization header' + def urlParams = new UrlTemplateParameters('some-dmi-service/dmi/v1/dataJob/{requestId}/dataProducerJob/{dataProducerJobId}/status?dataProducerId={dataProducerId}', ['dataProducerJobId':'some-data-producer-job-id', 'dataProducerId':'some-data-job-id', 'requestId':'some-request-id']) + and: 'the rest client returns a status for the given parameters' + mockDmiRestClient.getDataJobStatus(urlParams, authorization) >> Mono.just('some status') + when: 'the job status is queried' + def status = objectUnderTest.getDataJobStatus(authorization, dmiServiceName, requestId, dataProducerJobId, dataJobId) + then: 'the status from the rest client is returned' + assert status == 'some status' + } +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandlerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandlerSpec.groovy index 7005cc6b18..b3dd02dec3 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandlerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandlerSpec.groovy @@ -19,7 +19,6 @@ class DmiSubJobRequestHandlerSpec extends Specification { def mockDmiRestClient = Mock(DmiRestClient) def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper()) def mockDmiProperties = Mock(DmiProperties) - def static NO_AUTH = null def objectUnderTest = new DmiSubJobRequestHandler(mockDmiRestClient, mockDmiProperties, jsonObjectMapper) def 'Send a sub-job request to the DMI Plugin.'() { @@ -28,12 +27,13 @@ class DmiSubJobRequestHandlerSpec extends Specification { def dataJobMetadata = new DataJobMetadata('d1', 't1', 't2') def dmiWriteOperation = new DmiWriteOperation('p', 'operation', 'tag', null, 'o1', [:]) def dmiWriteOperationsPerProducerKey = [new ProducerKey('dmi1', 'prod1'): [dmiWriteOperation]] + def authorization = 'my authorization header' and: 'the dmi rest client will return a response (for the correct parameters)' def responseEntity = new ResponseEntity<>(new SubJobWriteResponse('my-sub-job-id', 'dmi1', 'prod1'), HttpStatus.OK) def expectedJson = '{"dataAcceptType":"t1","dataContentType":"t2","dataProducerId":"prod1","data":[{"path":"p","op":"operation","moduleSetTag":"tag","value":null,"operationId":"o1","privateProperties":{}}]}' - mockDmiRestClient.synchronousPostOperationWithJsonData(RequiredDmiService.DATA, _, expectedJson, OperationType.CREATE, NO_AUTH) >> responseEntity + mockDmiRestClient.synchronousPostOperationWithJsonData(RequiredDmiService.DATA, _, expectedJson, OperationType.CREATE, authorization) >> responseEntity when: 'sending request to DMI invoked' - objectUnderTest.sendRequestsToDmi(dataJobId, dataJobMetadata, dmiWriteOperationsPerProducerKey) + objectUnderTest.sendRequestsToDmi(authorization, dataJobId, dataJobMetadata, dmiWriteOperationsPerProducerKey) then: 'the result contains the expected sub-job id' assert responseEntity.body.subJobId == 'my-sub-job-id' } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/dmi/DmiRestClientSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/dmi/DmiRestClientSpec.groovy index 3dadf23249..3444d7b86a 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/dmi/DmiRestClientSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/dmi/DmiRestClientSpec.groovy @@ -163,4 +163,16 @@ class DmiRestClientSpec extends Specification { 'DMI basic auth disabled, with NCMP bearer token' | false | BEARER_AUTH_HEADER || BEARER_AUTH_HEADER 'DMI basic auth disabled, with NCMP basic auth' | false | BASIC_AUTH_HEADER || NO_AUTH_HEADER } + + def 'DMI GET Operation for DMI Data Service '() { + given: 'the Data web client returns a valid response entity for the expected parameters' + mockDataServicesWebClient.get() >> mockRequestBody + def jsonNode = jsonObjectMapper.convertJsonString('{"status":"some status"}', JsonNode.class) + ((ObjectNode) jsonNode).put('status', 'some status') + mockResponse.bodyToMono(JsonNode.class) >> Mono.just(jsonNode) + when: 'GET operation is invoked for Data Service' + def response = objectUnderTest.getDataJobStatus(urlTemplateParameters, NO_AUTH_HEADER).block() + then: 'the response equals to the expected value' + assert response == 'some status' + } } |