summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobService.java15
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java45
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandler.java22
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java4
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionTuple.java34
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandler.java3
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java134
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java2
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java12
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobServiceImpl.java13
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java67
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java12
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java20
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandlerSpec.groovy29
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumerSpec.groovy2
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImplSpec.groovy70
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumerSpec.groovy2
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceServiceSpec.groovy13
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobServiceImplSpec.groovy7
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImplSpec.groovy54
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandlerSpec.groovy6
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/dmi/DmiRestClientSpec.groovy12
22 files changed, 506 insertions, 72 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobService.java
index 6ff79a9344..255b3847eb 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobService.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobService.java
@@ -31,20 +31,27 @@ public interface DataJobService {
/**
* process read data job operations.
*
- * @param dataJobId Unique identifier of the job within the request
+ * @param authorization the authorization header from the REST request
+ * @param dataJobId unique identifier of the job within the request
* @param dataJobMetadata data job request headers
* @param dataJobReadRequest read data job request
*/
- void readDataJob(String dataJobId, DataJobMetadata dataJobMetadata, DataJobReadRequest dataJobReadRequest);
+ void readDataJob(String authorization,
+ String dataJobId,
+ DataJobMetadata dataJobMetadata,
+ DataJobReadRequest dataJobReadRequest);
/**
* process write data job operations.
*
- * @param dataJobId Unique identifier of the job within the request
+ * @param authorization the authorization header from the REST request
+ * @param dataJobId unique identifier of the job within the request
* @param dataJobMetadata data job request headers
* @param dataJobWriteRequest write data job request
* @return a list of sub-job write responses
*/
- List<SubJobWriteResponse> writeDataJob(String dataJobId, DataJobMetadata dataJobMetadata,
+ List<SubJobWriteResponse> writeDataJob(String authorization,
+ String dataJobId,
+ DataJobMetadata dataJobMetadata,
DataJobWriteRequest dataJobWriteRequest);
} \ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java
new file mode 100644
index 0000000000..50d96f858c
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java
@@ -0,0 +1,45 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.datajobs;
+
+/**
+ * Service interface to check the status of a data job.
+ * The operations interact with a DMI Plugin to retrieve data job statuses.
+ */
+public interface DataJobStatusService {
+
+ /**
+ * Retrieves the current status of a specific data job.
+ *
+ * @param authorization The authorization header from the REST request.
+ * @param dmiServiceName The name of the DMI Service relevant to the data job.
+ * @param requestId The unique identifier for the overall data job request.
+ * @param dataProducerJobId The identifier of the data producer job within the DMI system.
+ * @param dataProducerId The ID of the producer registered by DMI, used for operations related to this request.
+ * This could include alternate IDs or specific identifiers.
+ * @return The current status of the data job as a String.
+ */
+ String getDataJobStatus(final String authorization,
+ final String dmiServiceName,
+ final String requestId,
+ final String dataProducerJobId,
+ final String dataProducerId);
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandler.java
index c5052f1405..b5ab7f6525 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandler.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandler.java
@@ -50,7 +50,7 @@ public class DmiCacheHandler {
private final InventoryPersistence inventoryPersistence;
/**
- * Adds new subscription to the subscription cache.
+ * Adds subscription to the subscription cache.
*
* @param subscriptionId subscription id
* @param predicates subscription request predicates
@@ -60,6 +60,18 @@ public class DmiCacheHandler {
}
/**
+ * Adds subscription to the subscription cache.
+ *
+ * @param subscriptionId subscription id
+ * @param dmiCmSubscriptionDetailsPerDmi map of dmi cm notification subscription details per dmi
+ */
+ public void add(final String subscriptionId,
+ final Map<String, DmiCmSubscriptionDetails>
+ dmiCmSubscriptionDetailsPerDmi) {
+ cmNotificationSubscriptionCache.put(subscriptionId, dmiCmSubscriptionDetailsPerDmi);
+ }
+
+ /**
* Get cm notification subscription cache entry via subscription id.
*
* @param subscriptionId subscription id
@@ -122,8 +134,8 @@ public class DmiCacheHandler {
* @param status String of status
*
*/
- public void updateDmiSubscriptionStatusPerDmi(final String subscriptionId, final String dmiServiceName,
- final CmSubscriptionStatus status) {
+ public void updateDmiSubscriptionStatus(final String subscriptionId, final String dmiServiceName,
+ final CmSubscriptionStatus status) {
final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
cmNotificationSubscriptionCache.get(subscriptionId);
dmiSubscriptionsPerDmi.get(dmiServiceName).setCmSubscriptionStatus(status);
@@ -162,7 +174,7 @@ public class DmiCacheHandler {
* @param dmiServiceName String of dmiServiceName
*
*/
- public void removeFromDatabasePerDmi(final String subscriptionId, final String dmiServiceName) {
+ public void removeFromDatabase(final String subscriptionId, final String dmiServiceName) {
final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates =
cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
.getDmiCmSubscriptionPredicates();
@@ -210,6 +222,6 @@ public class DmiCacheHandler {
private boolean isAcceptedOrRejected(final DmiCmSubscriptionDetails dmiCmSubscription) {
return dmiCmSubscription.getCmSubscriptionStatus().toString().equals("ACCEPTED")
- || dmiCmSubscription.getCmSubscriptionStatus().toString().equals("REJECTED");
+ || dmiCmSubscription.getCmSubscriptionStatus().toString().equals("REJECTED");
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java
index 20ccf528ed..20c7c7b13d 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java
@@ -80,7 +80,7 @@ public class DmiOutEventConsumer {
dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
}
if (eventType.equals("subscriptionDeleteResponse")) {
- dmiCacheHandler.removeFromDatabasePerDmi(subscriptionId, dmiPluginName);
+ dmiCacheHandler.removeFromDatabase(subscriptionId, dmiPluginName);
}
handleEventsStatusPerDmi(subscriptionId, eventType);
}
@@ -96,7 +96,7 @@ public class DmiOutEventConsumer {
private void handleCacheStatusPerDmi(final String subscriptionId, final String dmiPluginName,
final CmSubscriptionStatus cmSubscriptionStatus) {
- dmiCacheHandler.updateDmiSubscriptionStatusPerDmi(subscriptionId, dmiPluginName,
+ dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiPluginName,
cmSubscriptionStatus);
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionTuple.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionTuple.java
new file mode 100644
index 0000000000..cd4a15af47
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionTuple.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.models;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Tuple to be used during for to delete usecase.
+ *
+ * @param lastRemainingSubscriptionsPerDmi subscriptions that are used by only one subscriber grouped per dmi
+ * @param overlappingSubscriptionsPerDmi subscriptions that are shared by multiple subscribers grouped per dmi
+ */
+public record DmiCmSubscriptionTuple(Map<String, Collection<DmiCmSubscriptionKey>> lastRemainingSubscriptionsPerDmi,
+ Map<String, Collection<DmiCmSubscriptionKey>> overlappingSubscriptionsPerDmi) {
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandler.java
index 3a9b2066b2..90c5c575e6 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandler.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandler.java
@@ -37,8 +37,7 @@ public interface CmSubscriptionHandler {
* Process cm notification subscription delete request.
*
* @param subscriptionId subscription id
- * @param predicates subscription predicates
*/
- void processSubscriptionDeleteRequest(final String subscriptionId, final List<Predicate> predicates);
+ void processSubscriptionDeleteRequest(final String subscriptionId);
} \ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java
index 9d33d25816..c2c71dbaae 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java
@@ -21,34 +21,50 @@
package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiCmSubscriptionDetailsPerDmiMapper;
import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventMapper;
import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventProducer;
import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus;
import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionKey;
import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionTuple;
import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService;
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate;
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent;
+import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
+import org.onap.cps.spi.model.DataNode;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
+ private static final Pattern SUBSCRIPTION_KEY_FROM_XPATH_PATTERN = Pattern.compile(
+ "^/datastores/datastore\\[@name='([^']*)']/cm-handles/cm-handle\\[@id='([^']*)']/"
+ + "filters/filter\\[@xpath='(.*)']$");
+
private final CmSubscriptionPersistenceService cmSubscriptionPersistenceService;
private final CmSubscriptionComparator cmSubscriptionComparator;
private final NcmpOutEventMapper ncmpOutEventMapper;
private final DmiInEventMapper dmiInEventMapper;
+ private final DmiCmSubscriptionDetailsPerDmiMapper dmiCmSubscriptionDetailsPerDmiMapper;
private final NcmpOutEventProducer ncmpOutEventProducer;
private final DmiInEventProducer dmiInEventProducer;
private final DmiCacheHandler dmiCacheHandler;
+ private final InventoryPersistence inventoryPersistence;
@Override
public void processSubscriptionCreateRequest(final String subscriptionId, final List<Predicate> predicates) {
@@ -62,10 +78,45 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
}
@Override
- public void processSubscriptionDeleteRequest(final String subscriptionId, final List<Predicate> predicates) {
- dmiCacheHandler.add(subscriptionId, predicates);
- sendSubscriptionDeleteRequestToDmi(subscriptionId);
- scheduleNcmpOutEventResponse(subscriptionId, "subscriptionDeleteResponse");
+ public void processSubscriptionDeleteRequest(final String subscriptionId) {
+ final Collection<DataNode> subscriptionDataNodes =
+ cmSubscriptionPersistenceService.getAllNodesForSubscriptionId(subscriptionId);
+ final DmiCmSubscriptionTuple dmiCmSubscriptionTuple =
+ getLastRemainingAndOverlappingSubscriptionsPerDmi(subscriptionDataNodes);
+ dmiCacheHandler.add(subscriptionId, mergeDmiCmSubscriptionDetailsPerDmiMaps(dmiCmSubscriptionTuple));
+ if (dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi().isEmpty()) {
+ acceptAndPublishDeleteRequest(subscriptionId);
+ } else {
+ sendSubscriptionDeleteRequestToDmi(subscriptionId,
+ dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi(
+ dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi()));
+ scheduleNcmpOutEventResponse(subscriptionId, "subscriptionDeleteResponse");
+ }
+ }
+
+ private Map<String, DmiCmSubscriptionDetails> mergeDmiCmSubscriptionDetailsPerDmiMaps(
+ final DmiCmSubscriptionTuple dmiCmSubscriptionTuple) {
+ final Map<String, DmiCmSubscriptionDetails> lastRemainingDmiSubscriptionsPerDmi =
+ dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi(
+ dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi());
+ final Map<String, DmiCmSubscriptionDetails> overlappingDmiSubscriptionsPerDmi =
+ dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi(
+ dmiCmSubscriptionTuple.overlappingSubscriptionsPerDmi());
+ final Map<String, DmiCmSubscriptionDetails> mergedDmiSubscriptionsPerDmi =
+ new HashMap<>(lastRemainingDmiSubscriptionsPerDmi);
+ overlappingDmiSubscriptionsPerDmi.forEach((dmiServiceName, dmiCmSubscriptionDetails) ->
+ mergedDmiSubscriptionsPerDmi.merge(dmiServiceName, dmiCmSubscriptionDetails,
+ this::mergeDmiCmSubscriptionDetails));
+ return mergedDmiSubscriptionsPerDmi;
+ }
+
+ private DmiCmSubscriptionDetails mergeDmiCmSubscriptionDetails(
+ final DmiCmSubscriptionDetails dmiCmSubscriptionDetails,
+ final DmiCmSubscriptionDetails otherDmiCmSubscriptionDetails) {
+ final List<DmiCmSubscriptionPredicate> mergedDmiCmSubscriptionPredicates =
+ new ArrayList<>(dmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates());
+ mergedDmiCmSubscriptionPredicates.addAll(otherDmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates());
+ return new DmiCmSubscriptionDetails(mergedDmiCmSubscriptionPredicates, CmSubscriptionStatus.PENDING);
}
private void scheduleNcmpOutEventResponse(final String subscriptionId, final String eventType) {
@@ -81,6 +132,19 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, "subscriptionCreateResponse", ncmpOutEvent, false);
}
+ private void acceptAndPublishDeleteRequest(final String subscriptionId) {
+ final Set<String> dmiServiceNames = dmiCacheHandler.get(subscriptionId).keySet();
+ for (final String dmiServiceName : dmiServiceNames) {
+ dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiServiceName,
+ CmSubscriptionStatus.ACCEPTED);
+ dmiCacheHandler.removeFromDatabase(subscriptionId, dmiServiceName);
+ }
+ final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId,
+ dmiCacheHandler.get(subscriptionId));
+ ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, "subscriptionDeleteResponse", ncmpOutEvent,
+ false);
+ }
+
private void handleNewCmSubscription(final String subscriptionId) {
final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
dmiCacheHandler.get(subscriptionId);
@@ -98,26 +162,68 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
}
private void publishDmiInEventPerDmi(final String subscriptionId, final String dmiPluginName,
- final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
final DmiInEvent dmiInEvent = dmiInEventMapper.toDmiInEvent(dmiCmSubscriptionPredicates);
dmiInEventProducer.publishDmiInEvent(subscriptionId, dmiPluginName,
"subscriptionCreateRequest", dmiInEvent);
}
private void acceptAndPublishNcmpOutEventPerDmi(final String subscriptionId, final String dmiPluginName) {
- dmiCacheHandler.updateDmiSubscriptionStatusPerDmi(subscriptionId, dmiPluginName,
+ dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiPluginName,
CmSubscriptionStatus.ACCEPTED);
dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
}
- private void sendSubscriptionDeleteRequestToDmi(final String subscriptionId) {
- final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
- dmiCacheHandler.get(subscriptionId);
- dmiSubscriptionsPerDmi.forEach((dmiPluginName, dmiSubscriptionDetails) -> {
- final DmiInEvent dmiInEvent = dmiInEventMapper.toDmiInEvent(
- dmiSubscriptionDetails.getDmiCmSubscriptionPredicates());
- dmiInEventProducer.publishDmiInEvent(subscriptionId, dmiPluginName,
- "subscriptionDeleteRequest", dmiInEvent);
+ private void sendSubscriptionDeleteRequestToDmi(final String subscriptionId,
+ final Map<String, DmiCmSubscriptionDetails>
+ dmiCmSubscriptionsPerDmi) {
+ dmiCmSubscriptionsPerDmi.forEach((dmiPluginName, dmiCmSubscriptionDetails) -> {
+ final DmiInEvent dmiInEvent =
+ dmiInEventMapper.toDmiInEvent(
+ dmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates());
+ dmiInEventProducer.publishDmiInEvent(subscriptionId,
+ dmiPluginName, "subscriptionDeleteRequest", dmiInEvent);
});
}
+
+
+ private DmiCmSubscriptionTuple getLastRemainingAndOverlappingSubscriptionsPerDmi(
+ final Collection<DataNode> subscriptionNodes) {
+ final Map<String, Collection<DmiCmSubscriptionKey>> lastRemainingSubscriptionsPerDmi = new HashMap<>();
+ final Map<String, Collection<DmiCmSubscriptionKey>> overlappingSubscriptionsPerDmi = new HashMap<>();
+
+ for (final DataNode subscriptionNode : subscriptionNodes) {
+ final DmiCmSubscriptionKey dmiCmSubscriptionKey = extractCmSubscriptionKey(subscriptionNode.getXpath());
+ final String dmiServiceName = inventoryPersistence.getYangModelCmHandle(
+ dmiCmSubscriptionKey.cmHandleId()).getDmiServiceName();
+ final List<String> subscribers = (List<String>) subscriptionNode.getLeaves().get("subscriptionIds");
+ populateDmiCmSubscriptionTuple(subscribers, overlappingSubscriptionsPerDmi,
+ lastRemainingSubscriptionsPerDmi, dmiServiceName, dmiCmSubscriptionKey);
+ }
+ return new DmiCmSubscriptionTuple(lastRemainingSubscriptionsPerDmi, overlappingSubscriptionsPerDmi);
+ }
+
+ private static void populateDmiCmSubscriptionTuple(final List<String> subscribers,
+ final Map<String, Collection<DmiCmSubscriptionKey>>
+ overlappingSubscriptionsPerDmi,
+ final Map<String, Collection<DmiCmSubscriptionKey>>
+ lastRemainingSubscriptionsPerDmi,
+ final String dmiServiceName,
+ final DmiCmSubscriptionKey dmiCmSubscriptionKey) {
+ final Map<String, Collection<DmiCmSubscriptionKey>> targetMap =
+ subscribers.size() > 1 ? overlappingSubscriptionsPerDmi : lastRemainingSubscriptionsPerDmi;
+ targetMap.computeIfAbsent(dmiServiceName, dmiName -> new HashSet<>()).add(dmiCmSubscriptionKey);
+ }
+
+ private DmiCmSubscriptionKey extractCmSubscriptionKey(final String xpath) {
+ final Matcher matcher = SUBSCRIPTION_KEY_FROM_XPATH_PATTERN.matcher(xpath);
+ if (matcher.find()) {
+ final String datastoreName = matcher.group(1);
+ final String cmHandleId = matcher.group(2);
+ final String filterXpath = matcher.group(3);
+ return new DmiCmSubscriptionKey(datastoreName, cmHandleId, filterXpath);
+ }
+ throw new IllegalArgumentException("DataNode xpath does not represent a subscription key");
+ }
+
} \ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java
index 1e1359dd0d..cba64e0e94 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java
@@ -63,7 +63,7 @@ public class NcmpInEventConsumer {
if ("subscriptionDeleteRequest".equals(cloudEvent.getType())) {
log.info("Subscription delete request for source {} with subscription id {} ...",
cloudEvent.getSource(), subscriptionId);
- cmSubscriptionHandler.processSubscriptionDeleteRequest(subscriptionId, predicates);
+ cmSubscriptionHandler.processSubscriptionDeleteRequest(subscriptionId);
}
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java
index c24507a1a7..c71109013a 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java
@@ -159,6 +159,18 @@ public class CmSubscriptionPersistenceService {
}
}
+ /**
+ * Retrieve all existing dataNodes for given subscription id.
+ *
+ * @param subscriptionId subscription id
+ * @return collection of DataNodes
+ */
+ public Collection<DataNode> getAllNodesForSubscriptionId(final String subscriptionId) {
+ return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
+ CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID.formatted(subscriptionId),
+ OMIT_DESCENDANTS);
+ }
+
private void deleteListOfSubscriptionsFor(final DatastoreType datastoreType, final String cmHandleId,
final String xpath) {
cpsDataService.deleteDataNode(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobServiceImpl.java
index 56ed6e30da..04c3ad2fc6 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobServiceImpl.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobServiceImpl.java
@@ -42,19 +42,26 @@ public class DataJobServiceImpl implements DataJobService {
private final WriteRequestExaminer writeRequestExaminer;
@Override
- public void readDataJob(final String dataJobId, final DataJobMetadata dataJobMetadata,
+ public void readDataJob(final String authorization,
+ final String dataJobId,
+ final DataJobMetadata dataJobMetadata,
final DataJobReadRequest dataJobReadRequest) {
log.info("data job id for read operation is: {}", dataJobId);
}
@Override
- public List<SubJobWriteResponse> writeDataJob(final String dataJobId, final DataJobMetadata dataJobMetadata,
+ public List<SubJobWriteResponse> writeDataJob(final String authorization,
+ final String dataJobId,
+ final DataJobMetadata dataJobMetadata,
final DataJobWriteRequest dataJobWriteRequest) {
log.info("data job id for write operation is: {}", dataJobId);
final Map<ProducerKey, List<DmiWriteOperation>> dmiWriteOperationsPerProducerKey =
writeRequestExaminer.splitDmiWriteOperationsFromRequest(dataJobId, dataJobWriteRequest);
- return dmiSubJobClient.sendRequestsToDmi(dataJobId, dataJobMetadata, dmiWriteOperationsPerProducerKey);
+ return dmiSubJobClient.sendRequestsToDmi(authorization,
+ dataJobId,
+ dataJobMetadata,
+ dmiWriteOperationsPerProducerKey);
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java
new file mode 100644
index 0000000000..a6ecaa1097
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java
@@ -0,0 +1,67 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.datajobs;
+
+import lombok.RequiredArgsConstructor;
+import org.onap.cps.ncmp.api.datajobs.DataJobStatusService;
+import org.onap.cps.ncmp.impl.dmi.DmiProperties;
+import org.onap.cps.ncmp.impl.dmi.DmiRestClient;
+import org.onap.cps.ncmp.impl.dmi.DmiServiceUrlTemplateBuilder;
+import org.onap.cps.ncmp.impl.dmi.UrlTemplateParameters;
+import org.springframework.stereotype.Service;
+
+/**
+ * Implementation of {@link DataJobStatusService} interface.
+ * The operations interact with a DMI Plugin to retrieve data job statuses.
+ */
+@Service
+@RequiredArgsConstructor
+public class DataJobStatusServiceImpl implements DataJobStatusService {
+
+ private final DmiRestClient dmiRestClient;
+ private final DmiProperties dmiProperties;
+
+ @Override
+ public String getDataJobStatus(final String authorization,
+ final String dmiServiceName,
+ final String requestId,
+ final String dataProducerJobId,
+ final String dataProducerId) {
+
+ final UrlTemplateParameters urlTemplateParameters = buildUrlParameters(dmiServiceName, requestId,
+ dataProducerJobId, dataProducerId);
+ return dmiRestClient.getDataJobStatus(urlTemplateParameters, authorization).block();
+ }
+
+ private UrlTemplateParameters buildUrlParameters(final String dmiServiceName,
+ final String requestId,
+ final String dataProducerJobId,
+ final String dataProducerId) {
+ return DmiServiceUrlTemplateBuilder.newInstance()
+ .fixedPathSegment("dataJob")
+ .variablePathSegment("requestId", requestId)
+ .fixedPathSegment("dataProducerJob")
+ .variablePathSegment("dataProducerJobId", dataProducerJobId)
+ .fixedPathSegment("status")
+ .queryParameter("dataProducerId", dataProducerId)
+ .createUrlTemplateParameters(dmiServiceName, dmiProperties.getDmiBasePath());
+ }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java
index 1624ce8ae6..c93709ce75 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java
@@ -48,17 +48,19 @@ public class DmiSubJobRequestHandler {
private final DmiRestClient dmiRestClient;
private final DmiProperties dmiProperties;
private final JsonObjectMapper jsonObjectMapper;
- static final String NO_AUTH_HEADER = null;
/**
* Sends sub-job write requests to the DMI Plugin.
*
- * @param dataJobId data ojb identifier
+ * @param authorization the authorization header from the REST request
+ * @param dataJobId data job identifier
* @param dataJobMetadata the data job's metadata
- * @param dmiWriteOperationsPerProducerKey a collection of write requests per producer key.
+ * @param dmiWriteOperationsPerProducerKey a collection of write requests per producer key
* @return a list of sub-job write responses
*/
- public List<SubJobWriteResponse> sendRequestsToDmi(final String dataJobId, final DataJobMetadata dataJobMetadata,
+ public List<SubJobWriteResponse> sendRequestsToDmi(final String authorization,
+ final String dataJobId,
+ final DataJobMetadata dataJobMetadata,
final Map<ProducerKey, List<DmiWriteOperation>> dmiWriteOperationsPerProducerKey) {
final List<SubJobWriteResponse> subJobWriteResponses = new ArrayList<>(dmiWriteOperationsPerProducerKey.size());
dmiWriteOperationsPerProducerKey.forEach((producerKey, dmi3ggpWriteOperations) -> {
@@ -71,7 +73,7 @@ public class DmiSubJobRequestHandler {
urlTemplateParameters,
jsonObjectMapper.asJsonString(subJobWriteRequest),
OperationType.CREATE,
- NO_AUTH_HEADER);
+ authorization);
final SubJobWriteResponse subJobWriteResponse = jsonObjectMapper
.convertToValueType(responseEntity.getBody(), SubJobWriteResponse.class);
log.debug("Sub job write response: {}", subJobWriteResponse);
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java
index 7ac85cbf84..ba6bba9c53 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java
@@ -144,6 +144,26 @@ public class DmiRestClient {
.defaultIfEmpty(NOT_SPECIFIED);
}
+ /**
+ * Retrieves the status of a data job from the DMI service.
+ *
+ * @param urlTemplateParameters The URL template parameters for the DMI data job status endpoint.
+ * @param authorization The authorization token to be added to the request headers.
+ * @return A Mono emitting the status of the data job as a String.
+ * @throws DmiClientRequestException If there is an error during the DMI request.
+ */
+ public Mono<String> getDataJobStatus(final UrlTemplateParameters urlTemplateParameters,
+ final String authorization) {
+
+ return dataServicesWebClient.get()
+ .uri(urlTemplateParameters.urlTemplate(), urlTemplateParameters.urlVariables())
+ .headers(httpHeaders -> configureHttpHeaders(httpHeaders, authorization))
+ .retrieve()
+ .bodyToMono(JsonNode.class)
+ .map(responseHealthStatus -> responseHealthStatus.path("status").asText())
+ .onErrorMap(throwable -> handleDmiClientException(throwable, OperationType.READ.getOperationName()));
+ }
+
private WebClient getWebClient(final RequiredDmiService requiredDmiService) {
return requiredDmiService.equals(RequiredDmiService.DATA) ? dataServicesWebClient : modelServicesWebClient;
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandlerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandlerSpec.groovy
index 2d50e770dd..791a154608 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandlerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandlerSpec.groovy
@@ -65,17 +65,34 @@ class DmiCacheHandlerSpec extends MessagingBaseSpec {
initialiseMockInventoryPersistenceResponses()
}
- def 'Load CM subscription event to cache'() {
- given: 'a valid subscription event with Id'
+ def 'Load CM subscription event to cache with predicates'() {
+ given: 'a subscription event with id'
def subscriptionId = ncmpInEvent.getData().getSubscriptionId()
and: 'list of predicates'
def predicates = ncmpInEvent.getData().getPredicates()
- when: 'a valid event object loaded in cache'
+ when: 'subscription is loaded to cache with predicates'
objectUnderTest.add(subscriptionId, predicates)
- then: 'the cache contains the correct entry with #subscriptionId subscription ID'
+ then: 'the number of entries in cache is correct'
+ assert testCache.size() == 1
+ and: 'the cache contains the correct entries'
assert testCache.containsKey(subscriptionId)
}
+ def 'Load CM subscription event to cache with dmi subscription details per dmi'() {
+ given: 'a subscription event with id'
+ def subscriptionId = ncmpInEvent.getData().getSubscriptionId()
+ and: 'dmi subscription details per dmi'
+ def dmiSubscriptionsPerDmi = [:]
+ when: 'subscription is loaded to cache with dmi subscription details per dmi'
+ objectUnderTest.add(subscriptionId, dmiSubscriptionsPerDmi)
+ then: 'the number of entries in cache is correct'
+ assert testCache.size() == 1
+ and: 'the cache contains the correct entries'
+ assert testCache.containsKey(subscriptionId)
+ and: 'the entry for the subscription ID matches the provided DMI subscription details'
+ assert testCache.get(subscriptionId) == dmiSubscriptionsPerDmi
+ }
+
def 'Get cache entry via subscription id'() {
given: 'the cache contains value for some-id'
testCache.put('some-id',[:])
@@ -157,7 +174,7 @@ class DmiCacheHandlerSpec extends MessagingBaseSpec {
def subscriptionId = ncmpInEvent.getData().getSubscriptionId()
objectUnderTest.add(subscriptionId, predicates)
when: 'subscription status per dmi is updated in cache'
- objectUnderTest.updateDmiSubscriptionStatusPerDmi(subscriptionId,'dmi-1', CmSubscriptionStatus.ACCEPTED)
+ objectUnderTest.updateDmiSubscriptionStatus(subscriptionId,'dmi-1', CmSubscriptionStatus.ACCEPTED)
then: 'verify status has been updated in cache'
def predicate = testCache.get(subscriptionId)
assert predicate.get('dmi-1').cmSubscriptionStatus == CmSubscriptionStatus.ACCEPTED
@@ -180,7 +197,7 @@ class DmiCacheHandlerSpec extends MessagingBaseSpec {
def subscriptionId = ncmpInEvent.getData().getSubscriptionId()
objectUnderTest.add(subscriptionId, predicates)
when: 'subscription is persisted in database'
- objectUnderTest.removeFromDatabasePerDmi(subscriptionId,'dmi-1')
+ objectUnderTest.removeFromDatabase(subscriptionId,'dmi-1')
then: 'persistence service is called the correct number of times per dmi'
4 * mockCmSubscriptionPersistenceService.removeCmSubscription(_,_,_,subscriptionId)
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumerSpec.groovy
index 6e28d14810..bcf8780873 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumerSpec.groovy
@@ -103,7 +103,7 @@ class DmiOutEventConsumerSpec extends MessagingBaseSpec {
when: 'the event is consumed'
objectUnderTest.consumeDmiOutEvent(consumerRecord)
then: 'correct number of calls to cache'
- expectedCacheCalls * mockDmiCacheHandler.updateDmiSubscriptionStatusPerDmi('sub-1','test-dmi-plugin-name', subscriptionStatus)
+ expectedCacheCalls * mockDmiCacheHandler.updateDmiSubscriptionStatus('sub-1','test-dmi-plugin-name', subscriptionStatus)
and: 'correct number of calls to persist cache'
expectedPersistenceCalls * mockDmiCacheHandler.persistIntoDatabasePerDmi('sub-1','test-dmi-plugin-name')
and: 'correct number of calls to map the ncmp out event'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImplSpec.groovy
index 3f6556d47e..f902c60482 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImplSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImplSpec.groovy
@@ -22,6 +22,7 @@ package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp
import com.fasterxml.jackson.databind.ObjectMapper
import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiCmSubscriptionDetailsPerDmiMapper
import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventMapper
import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventProducer
import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails
@@ -30,7 +31,10 @@ import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPer
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent
+import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
+import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
import org.onap.cps.ncmp.utils.TestUtils
+import org.onap.cps.spi.model.DataNode
import org.onap.cps.utils.JsonObjectMapper
import spock.lang.Specification
@@ -45,13 +49,15 @@ class CmSubscriptionHandlerImplSpec extends Specification {
def mockCmSubscriptionComparator = Mock(CmSubscriptionComparator)
def mockNcmpOutEventMapper = Mock(NcmpOutEventMapper)
def mockDmiInEventMapper = Mock(DmiInEventMapper)
+ def dmiCmSubscriptionDetailsPerDmiMapper = new DmiCmSubscriptionDetailsPerDmiMapper()
def mockNcmpOutEventProducer = Mock(NcmpOutEventProducer)
def mockDmiInEventProducer = Mock(DmiInEventProducer)
def mockDmiCacheHandler = Mock(DmiCacheHandler)
+ def mockInventoryPersistence = Mock(InventoryPersistence)
def objectUnderTest = new CmSubscriptionHandlerImpl(mockCmSubscriptionPersistenceService,
- mockCmSubscriptionComparator, mockNcmpOutEventMapper, mockDmiInEventMapper,
- mockNcmpOutEventProducer, mockDmiInEventProducer, mockDmiCacheHandler)
+ mockCmSubscriptionComparator, mockNcmpOutEventMapper, mockDmiInEventMapper, dmiCmSubscriptionDetailsPerDmiMapper,
+ mockNcmpOutEventProducer, mockDmiInEventProducer, mockDmiCacheHandler, mockInventoryPersistence)
def testDmiSubscriptionsPerDmi = ["dmi-1": new DmiCmSubscriptionDetails([], PENDING)]
@@ -97,7 +103,7 @@ class CmSubscriptionHandlerImplSpec extends Specification {
then: 'the subscription cache handler is called once'
1 * mockDmiCacheHandler.add('test-id', _)
and: 'the subscription details are updated in the cache'
- 1 * mockDmiCacheHandler.updateDmiSubscriptionStatusPerDmi('test-id', _, ACCEPTED)
+ 1 * mockDmiCacheHandler.updateDmiSubscriptionStatus('test-id', _, ACCEPTED)
and: 'we schedule to send the response after configured time from the cache'
1 * mockNcmpOutEventProducer.publishNcmpOutEvent('test-id', 'subscriptionCreateResponse', null, true)
}
@@ -123,24 +129,44 @@ class CmSubscriptionHandlerImplSpec extends Specification {
}
def 'Consume valid CmNotificationSubscriptionNcmpInEvent delete message'() {
- given: 'a cmNotificationSubscriptionNcmp in event for delete'
- def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
- def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class)
- and: 'relevant details is extracted from the event'
- def subscriptionId = testEventConsumed.getData().getSubscriptionId()
- def predicates = testEventConsumed.getData().getPredicates()
- and: 'the cache handler returns for relevant subscription id'
- 1 * mockDmiCacheHandler.get('test-id') >> testDmiSubscriptionsPerDmi
- when: 'the valid and unique event is consumed'
- objectUnderTest.processSubscriptionDeleteRequest(subscriptionId, predicates)
- then: 'the subscription cache handler is called once'
- 1 * mockDmiCacheHandler.add('test-id', predicates)
- and: 'the mapper handler to get DMI in event is called once'
- 1 * mockDmiInEventMapper.toDmiInEvent(_)
- and: 'the events handler method to publish DMI event is called correct number of times with the correct parameters'
- testDmiSubscriptionsPerDmi.size() * mockDmiInEventProducer.publishDmiInEvent(
- 'test-id', 'dmi-1', 'subscriptionDeleteRequest', _)
- and: 'we schedule to send the response after configured time from the cache'
- 1 * mockNcmpOutEventProducer.publishNcmpOutEvent('test-id', 'subscriptionDeleteResponse', null, true)
+ given: 'a test subscription id'
+ def subscriptionId = 'test-id'
+ and: 'the persistence service returns datanodes'
+ 1 * mockCmSubscriptionPersistenceService.getAllNodesForSubscriptionId(subscriptionId) >>
+ [new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-1']/filters/filter[@xpath='x/y']", leaves: ['xpath': 'x/y', 'subscriptionIds': ['test-id']]),
+ new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-2']/filters/filter[@xpath='y/z']", leaves: ['xpath': 'y/z', 'subscriptionIds': ['test-id']])]
+ and: 'the inventory persistence returns yang model cm handles'
+ 1 * mockInventoryPersistence.getYangModelCmHandle('ch-1') >> new YangModelCmHandle(dmiServiceName: 'dmi-1')
+ 1 * mockInventoryPersistence.getYangModelCmHandle('ch-2') >> new YangModelCmHandle(dmiServiceName: 'dmi-2')
+ when: 'the subscription delete request is processed'
+ objectUnderTest.processSubscriptionDeleteRequest(subscriptionId)
+ then: 'the method to publish a dmi event is called with correct parameters'
+ 1 * mockDmiInEventProducer.publishDmiInEvent(subscriptionId,'dmi-1','subscriptionDeleteRequest',_)
+ 1 * mockDmiInEventProducer.publishDmiInEvent(subscriptionId,'dmi-2','subscriptionDeleteRequest',_)
+ and: 'the method to publish nmcp out event is called with correct parameters'
+ 1 * mockNcmpOutEventProducer.publishNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, true)
+ }
+
+ def 'Delete a subscriber for fully overlapping subscriptions'() {
+ given: 'a test subscription id'
+ def subscriptionId = 'test-id'
+ and: 'the persistence service returns datanodes with multiple subscribers'
+ 1 * mockCmSubscriptionPersistenceService.getAllNodesForSubscriptionId(subscriptionId) >>
+ [new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-1']/filters/filter[@xpath='x/y']", leaves: ['xpath': 'x/y', 'subscriptionIds': ['test-id','other-id']]),
+ new DataNode(xpath: "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-2']/filters/filter[@xpath='y/z']", leaves: ['xpath': 'y/z', 'subscriptionIds': ['test-id','other-id']])]
+ and: 'the inventory persistence returns yang model cm handles'
+ 1 * mockInventoryPersistence.getYangModelCmHandle('ch-1') >> new YangModelCmHandle(dmiServiceName: 'dmi-1')
+ 1 * mockInventoryPersistence.getYangModelCmHandle('ch-2') >> new YangModelCmHandle(dmiServiceName: 'dmi-2')
+ and: 'the cache handler returns the relevant maps whenever called'
+ 2 * mockDmiCacheHandler.get(subscriptionId) >> ['dmi-1':[:],'dmi-2':[:]]
+ when: 'the subscription delete request is processed'
+ objectUnderTest.processSubscriptionDeleteRequest(subscriptionId)
+ then: 'the method to publish a dmi event is never called'
+ 0 * mockDmiInEventProducer.publishDmiInEvent(_,_,_,_)
+ and: 'the cache handler is called to remove subscriber from database per dmi'
+ 1 * mockDmiCacheHandler.removeFromDatabase('test-id', 'dmi-1')
+ 1 * mockDmiCacheHandler.removeFromDatabase('test-id', 'dmi-2')
+ and: 'the method to publish nmcp out event is called with correct parameters'
+ 1 * mockNcmpOutEventProducer.publishNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, false)
}
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumerSpec.groovy
index 2881737e82..9c24e2b005 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumerSpec.groovy
@@ -100,7 +100,7 @@ class NcmpInEventConsumerSpec extends MessagingBaseSpec {
and: 'the log indicates the task completed successfully'
assert loggingEvent.formattedMessage == 'Subscription delete request for source some-resource with subscription id test-id ...'
and: 'the subscription handler service is called once'
- 1 * mockCmSubscriptionHandler.processSubscriptionDeleteRequest('test-id',_)
+ 1 * mockCmSubscriptionHandler.processSubscriptionDeleteRequest('test-id')
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceServiceSpec.groovy
index d32d143ade..354e2af937 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceServiceSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceServiceSpec.groovy
@@ -188,4 +188,17 @@ class CmSubscriptionPersistenceServiceSpec extends Specification {
'cm handle in same datastore is NOT used for other subscriptions' | [] || 1
}
+ def 'Get all nodes for subscription id'() {
+ given: 'the query service returns nodes for subscription id'
+ def expectedDataNode = new DataNode(xpath: '/some/xpath')
+ def queryServiceResponse = [expectedDataNode].asCollection()
+ 1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions', '//filter/subscriptionIds[text()=\'some-id\']', OMIT_DESCENDANTS) >> queryServiceResponse
+ when: 'retrieving all nodes for subscription id'
+ def result = objectUnderTest.getAllNodesForSubscriptionId('some-id')
+ then: 'the result returns correct number of datanodes'
+ assert result.size() == 1
+ and: 'the attribute of the datanode is as expected'
+ assert result.iterator().next().xpath == expectedDataNode.xpath
+ }
+
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobServiceImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobServiceImplSpec.groovy
index 94c490ab07..4b536b9710 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobServiceImplSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobServiceImplSpec.groovy
@@ -40,6 +40,7 @@ class DataJobServiceImplSpec extends Specification {
def objectUnderTest = new DataJobServiceImpl(mockDmiSubJobRequestHandler, mockWriteRequestExaminer)
def myDataJobMetadata = new DataJobMetadata('', '', '')
+ def authorization = 'my authorization header'
def logger = Spy(ListAppender<ILoggingEvent>)
@@ -54,7 +55,7 @@ class DataJobServiceImplSpec extends Specification {
def 'Read data job request.'() {
when: 'read data job request is processed'
def readOperation = new ReadOperation('', '', '', [], [], '', '', 1)
- objectUnderTest.readDataJob('my-job-id', myDataJobMetadata, new DataJobReadRequest([readOperation]))
+ objectUnderTest.readDataJob(authorization, 'my-job-id', myDataJobMetadata, new DataJobReadRequest([readOperation]))
then: 'the data job id is correctly logged'
def loggingEvent = logger.list[0]
assert loggingEvent.level == Level.INFO
@@ -67,11 +68,11 @@ class DataJobServiceImplSpec extends Specification {
and: 'a map of producer key and dmi 3gpp write operation'
def dmiWriteOperationsPerProducerKey = [:]
when: 'write data job request is processed'
- objectUnderTest.writeDataJob('my-job-id', myDataJobMetadata, dataJobWriteRequest)
+ objectUnderTest.writeDataJob(authorization, 'my-job-id', myDataJobMetadata, dataJobWriteRequest)
then: 'the examiner service is called and a map is returned'
1 * mockWriteRequestExaminer.splitDmiWriteOperationsFromRequest('my-job-id', dataJobWriteRequest) >> dmiWriteOperationsPerProducerKey
and: 'the dmi request handler is called with the result from the examiner'
- 1 * mockDmiSubJobRequestHandler.sendRequestsToDmi('my-job-id', myDataJobMetadata, dmiWriteOperationsPerProducerKey)
+ 1 * mockDmiSubJobRequestHandler.sendRequestsToDmi(authorization, 'my-job-id', myDataJobMetadata, dmiWriteOperationsPerProducerKey)
}
def setupLogger() {
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImplSpec.groovy
new file mode 100644
index 0000000000..cc042988f6
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImplSpec.groovy
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.datajobs
+
+import org.onap.cps.ncmp.impl.dmi.DmiProperties
+import org.onap.cps.ncmp.impl.dmi.DmiRestClient
+import org.onap.cps.ncmp.impl.dmi.UrlTemplateParameters
+import reactor.core.publisher.Mono
+import spock.lang.Specification
+
+class DataJobStatusServiceImplSpec extends Specification {
+
+ def mockDmiRestClient = Mock(DmiRestClient)
+ def mockDmiProperties = Mock(DmiProperties)
+ def objectUnderTest = new DataJobStatusServiceImpl(mockDmiRestClient, mockDmiProperties)
+
+ def setup() {
+ mockDmiProperties.dmiBasePath >> 'dmi'
+ }
+
+ def 'Forward a data job status query to DMI.' () {
+ given: 'the required parameters for querying'
+ def dmiServiceName = 'some-dmi-service'
+ def requestId = 'some-request-id'
+ def dataProducerJobId = 'some-data-producer-job-id'
+ def dataJobId = 'some-data-job-id'
+ def authorization = 'my authorization header'
+ def urlParams = new UrlTemplateParameters('some-dmi-service/dmi/v1/dataJob/{requestId}/dataProducerJob/{dataProducerJobId}/status?dataProducerId={dataProducerId}', ['dataProducerJobId':'some-data-producer-job-id', 'dataProducerId':'some-data-job-id', 'requestId':'some-request-id'])
+ and: 'the rest client returns a status for the given parameters'
+ mockDmiRestClient.getDataJobStatus(urlParams, authorization) >> Mono.just('some status')
+ when: 'the job status is queried'
+ def status = objectUnderTest.getDataJobStatus(authorization, dmiServiceName, requestId, dataProducerJobId, dataJobId)
+ then: 'the status from the rest client is returned'
+ assert status == 'some status'
+ }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandlerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandlerSpec.groovy
index 7005cc6b18..b3dd02dec3 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandlerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandlerSpec.groovy
@@ -19,7 +19,6 @@ class DmiSubJobRequestHandlerSpec extends Specification {
def mockDmiRestClient = Mock(DmiRestClient)
def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
def mockDmiProperties = Mock(DmiProperties)
- def static NO_AUTH = null
def objectUnderTest = new DmiSubJobRequestHandler(mockDmiRestClient, mockDmiProperties, jsonObjectMapper)
def 'Send a sub-job request to the DMI Plugin.'() {
@@ -28,12 +27,13 @@ class DmiSubJobRequestHandlerSpec extends Specification {
def dataJobMetadata = new DataJobMetadata('d1', 't1', 't2')
def dmiWriteOperation = new DmiWriteOperation('p', 'operation', 'tag', null, 'o1', [:])
def dmiWriteOperationsPerProducerKey = [new ProducerKey('dmi1', 'prod1'): [dmiWriteOperation]]
+ def authorization = 'my authorization header'
and: 'the dmi rest client will return a response (for the correct parameters)'
def responseEntity = new ResponseEntity<>(new SubJobWriteResponse('my-sub-job-id', 'dmi1', 'prod1'), HttpStatus.OK)
def expectedJson = '{"dataAcceptType":"t1","dataContentType":"t2","dataProducerId":"prod1","data":[{"path":"p","op":"operation","moduleSetTag":"tag","value":null,"operationId":"o1","privateProperties":{}}]}'
- mockDmiRestClient.synchronousPostOperationWithJsonData(RequiredDmiService.DATA, _, expectedJson, OperationType.CREATE, NO_AUTH) >> responseEntity
+ mockDmiRestClient.synchronousPostOperationWithJsonData(RequiredDmiService.DATA, _, expectedJson, OperationType.CREATE, authorization) >> responseEntity
when: 'sending request to DMI invoked'
- objectUnderTest.sendRequestsToDmi(dataJobId, dataJobMetadata, dmiWriteOperationsPerProducerKey)
+ objectUnderTest.sendRequestsToDmi(authorization, dataJobId, dataJobMetadata, dmiWriteOperationsPerProducerKey)
then: 'the result contains the expected sub-job id'
assert responseEntity.body.subJobId == 'my-sub-job-id'
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/dmi/DmiRestClientSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/dmi/DmiRestClientSpec.groovy
index 3dadf23249..3444d7b86a 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/dmi/DmiRestClientSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/dmi/DmiRestClientSpec.groovy
@@ -163,4 +163,16 @@ class DmiRestClientSpec extends Specification {
'DMI basic auth disabled, with NCMP bearer token' | false | BEARER_AUTH_HEADER || BEARER_AUTH_HEADER
'DMI basic auth disabled, with NCMP basic auth' | false | BASIC_AUTH_HEADER || NO_AUTH_HEADER
}
+
+ def 'DMI GET Operation for DMI Data Service '() {
+ given: 'the Data web client returns a valid response entity for the expected parameters'
+ mockDataServicesWebClient.get() >> mockRequestBody
+ def jsonNode = jsonObjectMapper.convertJsonString('{"status":"some status"}', JsonNode.class)
+ ((ObjectNode) jsonNode).put('status', 'some status')
+ mockResponse.bodyToMono(JsonNode.class) >> Mono.just(jsonNode)
+ when: 'GET operation is invoked for Data Service'
+ def response = objectUnderTest.getDataJobStatus(urlTemplateParameters, NO_AUTH_HEADER).block()
+ then: 'the response equals to the expected value'
+ assert response == 'some status'
+ }
}