diff options
Diffstat (limited to 'cps-ncmp-service/src/main/java/org')
13 files changed, 347 insertions, 36 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobService.java index 6ff79a9344..255b3847eb 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobService.java @@ -31,20 +31,27 @@ public interface DataJobService { /** * process read data job operations. * - * @param dataJobId Unique identifier of the job within the request + * @param authorization the authorization header from the REST request + * @param dataJobId unique identifier of the job within the request * @param dataJobMetadata data job request headers * @param dataJobReadRequest read data job request */ - void readDataJob(String dataJobId, DataJobMetadata dataJobMetadata, DataJobReadRequest dataJobReadRequest); + void readDataJob(String authorization, + String dataJobId, + DataJobMetadata dataJobMetadata, + DataJobReadRequest dataJobReadRequest); /** * process write data job operations. * - * @param dataJobId Unique identifier of the job within the request + * @param authorization the authorization header from the REST request + * @param dataJobId unique identifier of the job within the request * @param dataJobMetadata data job request headers * @param dataJobWriteRequest write data job request * @return a list of sub-job write responses */ - List<SubJobWriteResponse> writeDataJob(String dataJobId, DataJobMetadata dataJobMetadata, + List<SubJobWriteResponse> writeDataJob(String authorization, + String dataJobId, + DataJobMetadata dataJobMetadata, DataJobWriteRequest dataJobWriteRequest); }
\ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java new file mode 100644 index 0000000000..50d96f858c --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java @@ -0,0 +1,45 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.datajobs; + +/** + * Service interface to check the status of a data job. + * The operations interact with a DMI Plugin to retrieve data job statuses. + */ +public interface DataJobStatusService { + + /** + * Retrieves the current status of a specific data job. + * + * @param authorization The authorization header from the REST request. + * @param dmiServiceName The name of the DMI Service relevant to the data job. + * @param requestId The unique identifier for the overall data job request. + * @param dataProducerJobId The identifier of the data producer job within the DMI system. + * @param dataProducerId The ID of the producer registered by DMI, used for operations related to this request. + * This could include alternate IDs or specific identifiers. + * @return The current status of the data job as a String. + */ + String getDataJobStatus(final String authorization, + final String dmiServiceName, + final String requestId, + final String dataProducerJobId, + final String dataProducerId); +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandler.java index c5052f1405..b5ab7f6525 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandler.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandler.java @@ -50,7 +50,7 @@ public class DmiCacheHandler { private final InventoryPersistence inventoryPersistence; /** - * Adds new subscription to the subscription cache. + * Adds subscription to the subscription cache. * * @param subscriptionId subscription id * @param predicates subscription request predicates @@ -60,6 +60,18 @@ public class DmiCacheHandler { } /** + * Adds subscription to the subscription cache. + * + * @param subscriptionId subscription id + * @param dmiCmSubscriptionDetailsPerDmi map of dmi cm notification subscription details per dmi + */ + public void add(final String subscriptionId, + final Map<String, DmiCmSubscriptionDetails> + dmiCmSubscriptionDetailsPerDmi) { + cmNotificationSubscriptionCache.put(subscriptionId, dmiCmSubscriptionDetailsPerDmi); + } + + /** * Get cm notification subscription cache entry via subscription id. * * @param subscriptionId subscription id @@ -122,8 +134,8 @@ public class DmiCacheHandler { * @param status String of status * */ - public void updateDmiSubscriptionStatusPerDmi(final String subscriptionId, final String dmiServiceName, - final CmSubscriptionStatus status) { + public void updateDmiSubscriptionStatus(final String subscriptionId, final String dmiServiceName, + final CmSubscriptionStatus status) { final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi = cmNotificationSubscriptionCache.get(subscriptionId); dmiSubscriptionsPerDmi.get(dmiServiceName).setCmSubscriptionStatus(status); @@ -162,7 +174,7 @@ public class DmiCacheHandler { * @param dmiServiceName String of dmiServiceName * */ - public void removeFromDatabasePerDmi(final String subscriptionId, final String dmiServiceName) { + public void removeFromDatabase(final String subscriptionId, final String dmiServiceName) { final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates = cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName) .getDmiCmSubscriptionPredicates(); @@ -210,6 +222,6 @@ public class DmiCacheHandler { private boolean isAcceptedOrRejected(final DmiCmSubscriptionDetails dmiCmSubscription) { return dmiCmSubscription.getCmSubscriptionStatus().toString().equals("ACCEPTED") - || dmiCmSubscription.getCmSubscriptionStatus().toString().equals("REJECTED"); + || dmiCmSubscription.getCmSubscriptionStatus().toString().equals("REJECTED"); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java index 20ccf528ed..20c7c7b13d 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java @@ -80,7 +80,7 @@ public class DmiOutEventConsumer { dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName); } if (eventType.equals("subscriptionDeleteResponse")) { - dmiCacheHandler.removeFromDatabasePerDmi(subscriptionId, dmiPluginName); + dmiCacheHandler.removeFromDatabase(subscriptionId, dmiPluginName); } handleEventsStatusPerDmi(subscriptionId, eventType); } @@ -96,7 +96,7 @@ public class DmiOutEventConsumer { private void handleCacheStatusPerDmi(final String subscriptionId, final String dmiPluginName, final CmSubscriptionStatus cmSubscriptionStatus) { - dmiCacheHandler.updateDmiSubscriptionStatusPerDmi(subscriptionId, dmiPluginName, + dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiPluginName, cmSubscriptionStatus); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionTuple.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionTuple.java new file mode 100644 index 0000000000..cd4a15af47 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionTuple.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.cmnotificationsubscription.models; + +import java.util.Collection; +import java.util.Map; + +/** + * Tuple to be used during for to delete usecase. + * + * @param lastRemainingSubscriptionsPerDmi subscriptions that are used by only one subscriber grouped per dmi + * @param overlappingSubscriptionsPerDmi subscriptions that are shared by multiple subscribers grouped per dmi + */ +public record DmiCmSubscriptionTuple(Map<String, Collection<DmiCmSubscriptionKey>> lastRemainingSubscriptionsPerDmi, + Map<String, Collection<DmiCmSubscriptionKey>> overlappingSubscriptionsPerDmi) { +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandler.java index 3a9b2066b2..90c5c575e6 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandler.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandler.java @@ -37,8 +37,7 @@ public interface CmSubscriptionHandler { * Process cm notification subscription delete request. * * @param subscriptionId subscription id - * @param predicates subscription predicates */ - void processSubscriptionDeleteRequest(final String subscriptionId, final List<Predicate> predicates); + void processSubscriptionDeleteRequest(final String subscriptionId); }
\ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java index 9d33d25816..c2c71dbaae 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java @@ -21,34 +21,50 @@ package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiCmSubscriptionDetailsPerDmiMapper; import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventMapper; import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventProducer; import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus; import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionKey; import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionTuple; import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService; import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate; import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent; import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent; +import org.onap.cps.ncmp.impl.inventory.InventoryPersistence; +import org.onap.cps.spi.model.DataNode; import org.springframework.stereotype.Service; @Service @RequiredArgsConstructor public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler { + private static final Pattern SUBSCRIPTION_KEY_FROM_XPATH_PATTERN = Pattern.compile( + "^/datastores/datastore\\[@name='([^']*)']/cm-handles/cm-handle\\[@id='([^']*)']/" + + "filters/filter\\[@xpath='(.*)']$"); + private final CmSubscriptionPersistenceService cmSubscriptionPersistenceService; private final CmSubscriptionComparator cmSubscriptionComparator; private final NcmpOutEventMapper ncmpOutEventMapper; private final DmiInEventMapper dmiInEventMapper; + private final DmiCmSubscriptionDetailsPerDmiMapper dmiCmSubscriptionDetailsPerDmiMapper; private final NcmpOutEventProducer ncmpOutEventProducer; private final DmiInEventProducer dmiInEventProducer; private final DmiCacheHandler dmiCacheHandler; + private final InventoryPersistence inventoryPersistence; @Override public void processSubscriptionCreateRequest(final String subscriptionId, final List<Predicate> predicates) { @@ -62,10 +78,45 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler { } @Override - public void processSubscriptionDeleteRequest(final String subscriptionId, final List<Predicate> predicates) { - dmiCacheHandler.add(subscriptionId, predicates); - sendSubscriptionDeleteRequestToDmi(subscriptionId); - scheduleNcmpOutEventResponse(subscriptionId, "subscriptionDeleteResponse"); + public void processSubscriptionDeleteRequest(final String subscriptionId) { + final Collection<DataNode> subscriptionDataNodes = + cmSubscriptionPersistenceService.getAllNodesForSubscriptionId(subscriptionId); + final DmiCmSubscriptionTuple dmiCmSubscriptionTuple = + getLastRemainingAndOverlappingSubscriptionsPerDmi(subscriptionDataNodes); + dmiCacheHandler.add(subscriptionId, mergeDmiCmSubscriptionDetailsPerDmiMaps(dmiCmSubscriptionTuple)); + if (dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi().isEmpty()) { + acceptAndPublishDeleteRequest(subscriptionId); + } else { + sendSubscriptionDeleteRequestToDmi(subscriptionId, + dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi( + dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi())); + scheduleNcmpOutEventResponse(subscriptionId, "subscriptionDeleteResponse"); + } + } + + private Map<String, DmiCmSubscriptionDetails> mergeDmiCmSubscriptionDetailsPerDmiMaps( + final DmiCmSubscriptionTuple dmiCmSubscriptionTuple) { + final Map<String, DmiCmSubscriptionDetails> lastRemainingDmiSubscriptionsPerDmi = + dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi( + dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi()); + final Map<String, DmiCmSubscriptionDetails> overlappingDmiSubscriptionsPerDmi = + dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi( + dmiCmSubscriptionTuple.overlappingSubscriptionsPerDmi()); + final Map<String, DmiCmSubscriptionDetails> mergedDmiSubscriptionsPerDmi = + new HashMap<>(lastRemainingDmiSubscriptionsPerDmi); + overlappingDmiSubscriptionsPerDmi.forEach((dmiServiceName, dmiCmSubscriptionDetails) -> + mergedDmiSubscriptionsPerDmi.merge(dmiServiceName, dmiCmSubscriptionDetails, + this::mergeDmiCmSubscriptionDetails)); + return mergedDmiSubscriptionsPerDmi; + } + + private DmiCmSubscriptionDetails mergeDmiCmSubscriptionDetails( + final DmiCmSubscriptionDetails dmiCmSubscriptionDetails, + final DmiCmSubscriptionDetails otherDmiCmSubscriptionDetails) { + final List<DmiCmSubscriptionPredicate> mergedDmiCmSubscriptionPredicates = + new ArrayList<>(dmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates()); + mergedDmiCmSubscriptionPredicates.addAll(otherDmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates()); + return new DmiCmSubscriptionDetails(mergedDmiCmSubscriptionPredicates, CmSubscriptionStatus.PENDING); } private void scheduleNcmpOutEventResponse(final String subscriptionId, final String eventType) { @@ -81,6 +132,19 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler { ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, "subscriptionCreateResponse", ncmpOutEvent, false); } + private void acceptAndPublishDeleteRequest(final String subscriptionId) { + final Set<String> dmiServiceNames = dmiCacheHandler.get(subscriptionId).keySet(); + for (final String dmiServiceName : dmiServiceNames) { + dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiServiceName, + CmSubscriptionStatus.ACCEPTED); + dmiCacheHandler.removeFromDatabase(subscriptionId, dmiServiceName); + } + final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId, + dmiCacheHandler.get(subscriptionId)); + ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, "subscriptionDeleteResponse", ncmpOutEvent, + false); + } + private void handleNewCmSubscription(final String subscriptionId) { final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi = dmiCacheHandler.get(subscriptionId); @@ -98,26 +162,68 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler { } private void publishDmiInEventPerDmi(final String subscriptionId, final String dmiPluginName, - final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) { + final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) { final DmiInEvent dmiInEvent = dmiInEventMapper.toDmiInEvent(dmiCmSubscriptionPredicates); dmiInEventProducer.publishDmiInEvent(subscriptionId, dmiPluginName, "subscriptionCreateRequest", dmiInEvent); } private void acceptAndPublishNcmpOutEventPerDmi(final String subscriptionId, final String dmiPluginName) { - dmiCacheHandler.updateDmiSubscriptionStatusPerDmi(subscriptionId, dmiPluginName, + dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiPluginName, CmSubscriptionStatus.ACCEPTED); dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName); } - private void sendSubscriptionDeleteRequestToDmi(final String subscriptionId) { - final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi = - dmiCacheHandler.get(subscriptionId); - dmiSubscriptionsPerDmi.forEach((dmiPluginName, dmiSubscriptionDetails) -> { - final DmiInEvent dmiInEvent = dmiInEventMapper.toDmiInEvent( - dmiSubscriptionDetails.getDmiCmSubscriptionPredicates()); - dmiInEventProducer.publishDmiInEvent(subscriptionId, dmiPluginName, - "subscriptionDeleteRequest", dmiInEvent); + private void sendSubscriptionDeleteRequestToDmi(final String subscriptionId, + final Map<String, DmiCmSubscriptionDetails> + dmiCmSubscriptionsPerDmi) { + dmiCmSubscriptionsPerDmi.forEach((dmiPluginName, dmiCmSubscriptionDetails) -> { + final DmiInEvent dmiInEvent = + dmiInEventMapper.toDmiInEvent( + dmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates()); + dmiInEventProducer.publishDmiInEvent(subscriptionId, + dmiPluginName, "subscriptionDeleteRequest", dmiInEvent); }); } + + + private DmiCmSubscriptionTuple getLastRemainingAndOverlappingSubscriptionsPerDmi( + final Collection<DataNode> subscriptionNodes) { + final Map<String, Collection<DmiCmSubscriptionKey>> lastRemainingSubscriptionsPerDmi = new HashMap<>(); + final Map<String, Collection<DmiCmSubscriptionKey>> overlappingSubscriptionsPerDmi = new HashMap<>(); + + for (final DataNode subscriptionNode : subscriptionNodes) { + final DmiCmSubscriptionKey dmiCmSubscriptionKey = extractCmSubscriptionKey(subscriptionNode.getXpath()); + final String dmiServiceName = inventoryPersistence.getYangModelCmHandle( + dmiCmSubscriptionKey.cmHandleId()).getDmiServiceName(); + final List<String> subscribers = (List<String>) subscriptionNode.getLeaves().get("subscriptionIds"); + populateDmiCmSubscriptionTuple(subscribers, overlappingSubscriptionsPerDmi, + lastRemainingSubscriptionsPerDmi, dmiServiceName, dmiCmSubscriptionKey); + } + return new DmiCmSubscriptionTuple(lastRemainingSubscriptionsPerDmi, overlappingSubscriptionsPerDmi); + } + + private static void populateDmiCmSubscriptionTuple(final List<String> subscribers, + final Map<String, Collection<DmiCmSubscriptionKey>> + overlappingSubscriptionsPerDmi, + final Map<String, Collection<DmiCmSubscriptionKey>> + lastRemainingSubscriptionsPerDmi, + final String dmiServiceName, + final DmiCmSubscriptionKey dmiCmSubscriptionKey) { + final Map<String, Collection<DmiCmSubscriptionKey>> targetMap = + subscribers.size() > 1 ? overlappingSubscriptionsPerDmi : lastRemainingSubscriptionsPerDmi; + targetMap.computeIfAbsent(dmiServiceName, dmiName -> new HashSet<>()).add(dmiCmSubscriptionKey); + } + + private DmiCmSubscriptionKey extractCmSubscriptionKey(final String xpath) { + final Matcher matcher = SUBSCRIPTION_KEY_FROM_XPATH_PATTERN.matcher(xpath); + if (matcher.find()) { + final String datastoreName = matcher.group(1); + final String cmHandleId = matcher.group(2); + final String filterXpath = matcher.group(3); + return new DmiCmSubscriptionKey(datastoreName, cmHandleId, filterXpath); + } + throw new IllegalArgumentException("DataNode xpath does not represent a subscription key"); + } + }
\ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java index 1e1359dd0d..cba64e0e94 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java @@ -63,7 +63,7 @@ public class NcmpInEventConsumer { if ("subscriptionDeleteRequest".equals(cloudEvent.getType())) { log.info("Subscription delete request for source {} with subscription id {} ...", cloudEvent.getSource(), subscriptionId); - cmSubscriptionHandler.processSubscriptionDeleteRequest(subscriptionId, predicates); + cmSubscriptionHandler.processSubscriptionDeleteRequest(subscriptionId); } } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java index c24507a1a7..c71109013a 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java @@ -159,6 +159,18 @@ public class CmSubscriptionPersistenceService { } } + /** + * Retrieve all existing dataNodes for given subscription id. + * + * @param subscriptionId subscription id + * @return collection of DataNodes + */ + public Collection<DataNode> getAllNodesForSubscriptionId(final String subscriptionId) { + return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, + CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID.formatted(subscriptionId), + OMIT_DESCENDANTS); + } + private void deleteListOfSubscriptionsFor(final DatastoreType datastoreType, final String cmHandleId, final String xpath) { cpsDataService.deleteDataNode(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobServiceImpl.java index 56ed6e30da..04c3ad2fc6 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobServiceImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobServiceImpl.java @@ -42,19 +42,26 @@ public class DataJobServiceImpl implements DataJobService { private final WriteRequestExaminer writeRequestExaminer; @Override - public void readDataJob(final String dataJobId, final DataJobMetadata dataJobMetadata, + public void readDataJob(final String authorization, + final String dataJobId, + final DataJobMetadata dataJobMetadata, final DataJobReadRequest dataJobReadRequest) { log.info("data job id for read operation is: {}", dataJobId); } @Override - public List<SubJobWriteResponse> writeDataJob(final String dataJobId, final DataJobMetadata dataJobMetadata, + public List<SubJobWriteResponse> writeDataJob(final String authorization, + final String dataJobId, + final DataJobMetadata dataJobMetadata, final DataJobWriteRequest dataJobWriteRequest) { log.info("data job id for write operation is: {}", dataJobId); final Map<ProducerKey, List<DmiWriteOperation>> dmiWriteOperationsPerProducerKey = writeRequestExaminer.splitDmiWriteOperationsFromRequest(dataJobId, dataJobWriteRequest); - return dmiSubJobClient.sendRequestsToDmi(dataJobId, dataJobMetadata, dmiWriteOperationsPerProducerKey); + return dmiSubJobClient.sendRequestsToDmi(authorization, + dataJobId, + dataJobMetadata, + dmiWriteOperationsPerProducerKey); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java new file mode 100644 index 0000000000..a6ecaa1097 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java @@ -0,0 +1,67 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.datajobs; + +import lombok.RequiredArgsConstructor; +import org.onap.cps.ncmp.api.datajobs.DataJobStatusService; +import org.onap.cps.ncmp.impl.dmi.DmiProperties; +import org.onap.cps.ncmp.impl.dmi.DmiRestClient; +import org.onap.cps.ncmp.impl.dmi.DmiServiceUrlTemplateBuilder; +import org.onap.cps.ncmp.impl.dmi.UrlTemplateParameters; +import org.springframework.stereotype.Service; + +/** + * Implementation of {@link DataJobStatusService} interface. + * The operations interact with a DMI Plugin to retrieve data job statuses. + */ +@Service +@RequiredArgsConstructor +public class DataJobStatusServiceImpl implements DataJobStatusService { + + private final DmiRestClient dmiRestClient; + private final DmiProperties dmiProperties; + + @Override + public String getDataJobStatus(final String authorization, + final String dmiServiceName, + final String requestId, + final String dataProducerJobId, + final String dataProducerId) { + + final UrlTemplateParameters urlTemplateParameters = buildUrlParameters(dmiServiceName, requestId, + dataProducerJobId, dataProducerId); + return dmiRestClient.getDataJobStatus(urlTemplateParameters, authorization).block(); + } + + private UrlTemplateParameters buildUrlParameters(final String dmiServiceName, + final String requestId, + final String dataProducerJobId, + final String dataProducerId) { + return DmiServiceUrlTemplateBuilder.newInstance() + .fixedPathSegment("dataJob") + .variablePathSegment("requestId", requestId) + .fixedPathSegment("dataProducerJob") + .variablePathSegment("dataProducerJobId", dataProducerJobId) + .fixedPathSegment("status") + .queryParameter("dataProducerId", dataProducerId) + .createUrlTemplateParameters(dmiServiceName, dmiProperties.getDmiBasePath()); + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java index 1624ce8ae6..c93709ce75 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java @@ -48,17 +48,19 @@ public class DmiSubJobRequestHandler { private final DmiRestClient dmiRestClient; private final DmiProperties dmiProperties; private final JsonObjectMapper jsonObjectMapper; - static final String NO_AUTH_HEADER = null; /** * Sends sub-job write requests to the DMI Plugin. * - * @param dataJobId data ojb identifier + * @param authorization the authorization header from the REST request + * @param dataJobId data job identifier * @param dataJobMetadata the data job's metadata - * @param dmiWriteOperationsPerProducerKey a collection of write requests per producer key. + * @param dmiWriteOperationsPerProducerKey a collection of write requests per producer key * @return a list of sub-job write responses */ - public List<SubJobWriteResponse> sendRequestsToDmi(final String dataJobId, final DataJobMetadata dataJobMetadata, + public List<SubJobWriteResponse> sendRequestsToDmi(final String authorization, + final String dataJobId, + final DataJobMetadata dataJobMetadata, final Map<ProducerKey, List<DmiWriteOperation>> dmiWriteOperationsPerProducerKey) { final List<SubJobWriteResponse> subJobWriteResponses = new ArrayList<>(dmiWriteOperationsPerProducerKey.size()); dmiWriteOperationsPerProducerKey.forEach((producerKey, dmi3ggpWriteOperations) -> { @@ -71,7 +73,7 @@ public class DmiSubJobRequestHandler { urlTemplateParameters, jsonObjectMapper.asJsonString(subJobWriteRequest), OperationType.CREATE, - NO_AUTH_HEADER); + authorization); final SubJobWriteResponse subJobWriteResponse = jsonObjectMapper .convertToValueType(responseEntity.getBody(), SubJobWriteResponse.class); log.debug("Sub job write response: {}", subJobWriteResponse); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java index 7ac85cbf84..ba6bba9c53 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java @@ -144,6 +144,26 @@ public class DmiRestClient { .defaultIfEmpty(NOT_SPECIFIED); } + /** + * Retrieves the status of a data job from the DMI service. + * + * @param urlTemplateParameters The URL template parameters for the DMI data job status endpoint. + * @param authorization The authorization token to be added to the request headers. + * @return A Mono emitting the status of the data job as a String. + * @throws DmiClientRequestException If there is an error during the DMI request. + */ + public Mono<String> getDataJobStatus(final UrlTemplateParameters urlTemplateParameters, + final String authorization) { + + return dataServicesWebClient.get() + .uri(urlTemplateParameters.urlTemplate(), urlTemplateParameters.urlVariables()) + .headers(httpHeaders -> configureHttpHeaders(httpHeaders, authorization)) + .retrieve() + .bodyToMono(JsonNode.class) + .map(responseHealthStatus -> responseHealthStatus.path("status").asText()) + .onErrorMap(throwable -> handleDmiClientException(throwable, OperationType.READ.getOperationName())); + } + private WebClient getWebClient(final RequiredDmiService requiredDmiService) { return requiredDmiService.equals(RequiredDmiService.DATA) ? dataServicesWebClient : modelServicesWebClient; } |