aboutsummaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service
diff options
context:
space:
mode:
authorhalil.cakal <halil.cakal@est.tech>2023-07-13 11:28:18 +0100
committerhalil.cakal <halil.cakal@est.tech>2023-07-26 17:03:21 +0100
commitd789956fbf88f856472f975487c1975df91dbe3e (patch)
tree5969f9432199e0689d3bd150645a774ecefa3ac0 /cps-ncmp-service
parentdcf84ad73f0301ef41049e692b9963f6dcac3661 (diff)
Subscription Creation: NCMP to Client CloudEvent transformation
- Delete legacy avc subscription event and event outcome schemas - Change subscription response and outcome sample json file contents - Change ncmp event response code to support avc subscriptions - Add mapper that maps cloud event to subscription event response - Add mapper that maps subscription event outcome to cloud event - Change subscription event response consumer to consume CloudEvents - Change time out task to support response event instead client id and name - Change subscription event response mapper to support Cloud Event - Change subscription outcome mapper to group subscription responses as per details and status - Change subscription status to have fromString functionality - Change all unit tests to support new functionalities - Add cps exceptions for cloud event and outcome type - Add details field in yang model - Change data node helper to supoort details field - Consolidate final subscription response codes - Fix code smells reported by SonarLint Issue-ID: CPS-1739 Change-Id: I5eadeb8ef40d3d7befb762b5a8d2139fe3c85d7e Signed-off-by: halil.cakal <halil.cakal@est.tech>
Diffstat (limited to 'cps-ncmp-service')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpEventResponseCode.java6
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java9
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java9
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java39
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapper.java4
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java35
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapper.java29
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java132
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapper.java109
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java92
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionStatus.java40
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java61
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapper.java22
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventResponseCloudMapper.java57
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionOutcomeCloudMapper.java63
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/yangmodels/YangModelSubscriptionEvent.java9
-rw-r--r--cps-ncmp-service/src/main/resources/model/subscription.yang4
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumerSpec.groovy25
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy35
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumerSpec.groovy120
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapperSpec.groovy14
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcomeSpec.groovy116
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapperSpec.groovy50
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceSpec.groovy9
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeBaseSpec.groovy6
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeHelperSpec.groovy39
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapperSpec.groovy26
-rw-r--r--cps-ncmp-service/src/test/resources/application.yml2
-rw-r--r--cps-ncmp-service/src/test/resources/avcSubscriptionEventResponse.json49
-rw-r--r--cps-ncmp-service/src/test/resources/avcSubscriptionOutcomeEvent.json33
-rw-r--r--cps-ncmp-service/src/test/resources/avcSubscriptionOutcomeEvent2.json20
31 files changed, 793 insertions, 471 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpEventResponseCode.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpEventResponseCode.java
index d250c36a80..3b11249838 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpEventResponseCode.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpEventResponseCode.java
@@ -26,10 +26,14 @@ import lombok.Getter;
public enum NcmpEventResponseCode {
SUCCESS("0", "Successfully applied changes"),
+ SUCCESSFULLY_APPLIED_SUBSCRIPTION("1", "successfully applied subscription"),
CM_HANDLES_NOT_FOUND("100", "cm handle id(s) not found"),
CM_HANDLES_NOT_READY("101", "cm handle(s) not ready"),
DMI_SERVICE_NOT_RESPONDING("102", "dmi plugin service is not responding"),
- UNABLE_TO_READ_RESOURCE_DATA("103", "dmi plugin service is not able to read resource data");
+ UNABLE_TO_READ_RESOURCE_DATA("103", "dmi plugin service is not able to read resource data"),
+ PARTIALLY_APPLIED_SUBSCRIPTION("104", "partially applied subscription"),
+ SUBSCRIPTION_NOT_APPLICABLE("105", "subscription not applicable for all cm handles"),
+ SUBSCRIPTION_PENDING("106", "subscription pending for all cm handles");
private final String statusCode;
private final String statusMessage;
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java
index c178700eed..176e644bae 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java
@@ -24,6 +24,7 @@ import com.hazelcast.map.IMap;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
@Slf4j
@RequiredArgsConstructor
@@ -31,8 +32,7 @@ public class ResponseTimeoutTask implements Runnable {
private final IMap<String, Set<String>> forwardedSubscriptionEventCache;
private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome;
- private final String subscriptionClientId;
- private final String subscriptionName;
+ private final SubscriptionEventResponse subscriptionEventResponse;
@Override
public void run() {
@@ -47,9 +47,12 @@ public class ResponseTimeoutTask implements Runnable {
}
private void generateAndSendResponse() {
+ final String subscriptionClientId = subscriptionEventResponse.getData().getClientId();
+ final String subscriptionName = subscriptionEventResponse.getData().getSubscriptionName();
final String subscriptionEventId = subscriptionClientId + subscriptionName;
if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) {
- subscriptionEventResponseOutcome.sendResponse(subscriptionClientId, subscriptionName);
+ subscriptionEventResponseOutcome.sendResponse(subscriptionEventResponse,
+ "subscriptionCreatedStatus");
forwardedSubscriptionEventCache.remove(subscriptionEventId);
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java
index f511965c77..5afc52d7e7 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java
@@ -58,22 +58,23 @@ public class SubscriptionEventConsumer {
containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
public void consumeSubscriptionEvent(final ConsumerRecord<String, CloudEvent> subscriptionEventConsumerRecord) {
final CloudEvent cloudEvent = subscriptionEventConsumerRecord.value();
+ final String eventType = subscriptionEventConsumerRecord.value().getType();
final SubscriptionEvent subscriptionEvent = SubscriptionEventCloudMapper.toSubscriptionEvent(cloudEvent);
final String eventDatastore = subscriptionEvent.getData().getPredicates().getDatastore();
- if (!(eventDatastore.equals("passthrough-running") || eventDatastore.equals("passthrough-operational"))) {
+ if (!eventDatastore.equals("passthrough-running")) {
throw new OperationNotYetSupportedException(
- "passthrough datastores are currently only supported for event subscriptions");
+ "passthrough-running datastores are currently only supported for event subscriptions");
}
if ("CM".equals(subscriptionEvent.getData().getDataType().getDataCategory())) {
if (subscriptionModelLoaderEnabled) {
persistSubscriptionEvent(subscriptionEvent);
}
- if ("CREATE".equals(cloudEvent.getType())) {
+ if ("subscriptionCreated".equals(cloudEvent.getType())) {
log.info("Subscription for ClientID {} with name {} ...",
subscriptionEvent.getData().getSubscription().getClientID(),
subscriptionEvent.getData().getSubscription().getName());
if (notificationFeatureEnabled) {
- subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent);
+ subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent, eventType);
}
}
} else {
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
index 1fe963a279..f196cb01e9 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
@@ -44,6 +44,8 @@ import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.Data;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.CmHandle;
import org.onap.cps.spi.exceptions.OperationNotYetSupportedException;
import org.springframework.beans.factory.annotation.Value;
@@ -74,7 +76,7 @@ public class SubscriptionEventForwarder {
*
* @param subscriptionEvent the event to be forwarded
*/
- public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent) {
+ public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent, final String eventType) {
final List<String> cmHandleTargets = subscriptionEvent.getData().getPredicates().getTargets();
if (cmHandleTargets == null || cmHandleTargets.isEmpty()
|| cmHandleTargets.stream().anyMatch(id -> (id).contains("*"))) {
@@ -85,13 +87,19 @@ public class SubscriptionEventForwarder {
inventoryPersistence.getYangModelCmHandles(cmHandleTargets);
final Map<String, Map<String, Map<String, String>>> dmiPropertiesPerCmHandleIdPerServiceName
= DmiServiceNameOrganizer.getDmiPropertiesPerCmHandleIdPerServiceName(yangModelCmHandles);
- findDmisAndRespond(subscriptionEvent, cmHandleTargets, dmiPropertiesPerCmHandleIdPerServiceName);
+ findDmisAndRespond(subscriptionEvent, eventType, cmHandleTargets, dmiPropertiesPerCmHandleIdPerServiceName);
}
- private void findDmisAndRespond(final SubscriptionEvent subscriptionEvent,
+ private void findDmisAndRespond(final SubscriptionEvent subscriptionEvent, final String eventType,
final List<String> cmHandleTargetsAsStrings,
final Map<String, Map<String, Map<String, String>>>
dmiPropertiesPerCmHandleIdPerServiceName) {
+ final SubscriptionEventResponse emptySubscriptionEventResponse =
+ new SubscriptionEventResponse().withData(new Data());
+ emptySubscriptionEventResponse.getData().setSubscriptionName(
+ subscriptionEvent.getData().getSubscription().getName());
+ emptySubscriptionEventResponse.getData().setClientId(
+ subscriptionEvent.getData().getSubscription().getClientID());
final List<String> cmHandlesThatExistsInDb = dmiPropertiesPerCmHandleIdPerServiceName.entrySet().stream()
.map(Map.Entry::getValue).map(Map::keySet).flatMap(Set::stream).collect(Collectors.toList());
@@ -104,27 +112,27 @@ public class SubscriptionEventForwarder {
updatesCmHandlesToRejectedAndPersistSubscriptionEvent(subscriptionEvent, targetCmHandlesDoesNotExistInDb);
}
if (dmisToRespond.isEmpty()) {
- final String clientID = subscriptionEvent.getData().getSubscription().getClientID();
- final String subscriptionName = subscriptionEvent.getData().getSubscription().getName();
- subscriptionEventResponseOutcome.sendResponse(clientID, subscriptionName);
+ subscriptionEventResponseOutcome.sendResponse(emptySubscriptionEventResponse,
+ "subscriptionCreatedStatus");
} else {
- startResponseTimeout(subscriptionEvent, dmisToRespond);
+ startResponseTimeout(emptySubscriptionEventResponse, dmisToRespond);
final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent ncmpSubscriptionEvent =
clientSubscriptionEventMapper.toNcmpSubscriptionEvent(subscriptionEvent);
- forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, ncmpSubscriptionEvent);
+ forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, ncmpSubscriptionEvent, eventType);
}
}
- private void startResponseTimeout(final SubscriptionEvent subscriptionEvent, final Set<String> dmisToRespond) {
- final String subscriptionClientId = subscriptionEvent.getData().getSubscription().getClientID();
- final String subscriptionName = subscriptionEvent.getData().getSubscription().getName();
+ private void startResponseTimeout(final SubscriptionEventResponse emptySubscriptionEventResponse,
+ final Set<String> dmisToRespond) {
+ final String subscriptionClientId = emptySubscriptionEventResponse.getData().getClientId();
+ final String subscriptionName = emptySubscriptionEventResponse.getData().getSubscriptionName();
final String subscriptionEventId = subscriptionClientId + subscriptionName;
forwardedSubscriptionEventCache.put(subscriptionEventId, dmisToRespond,
ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS);
final ResponseTimeoutTask responseTimeoutTask =
new ResponseTimeoutTask(forwardedSubscriptionEventCache, subscriptionEventResponseOutcome,
- subscriptionClientId, subscriptionName);
+ emptySubscriptionEventResponse);
try {
executorService.schedule(responseTimeoutTask, dmiResponseTimeoutInMs, TimeUnit.MILLISECONDS);
} catch (final RuntimeException ex) {
@@ -135,7 +143,7 @@ public class SubscriptionEventForwarder {
private void forwardEventToDmis(final Map<String, Map<String, Map<String, String>>> dmiNameCmHandleMap,
final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent
- ncmpSubscriptionEvent) {
+ ncmpSubscriptionEvent, final String eventType) {
dmiNameCmHandleMap.forEach((dmiName, cmHandlePropertiesMap) -> {
final List<CmHandle> cmHandleTargets = cmHandlePropertiesMap.entrySet().stream().map(
cmHandleAndProperties -> {
@@ -150,7 +158,7 @@ public class SubscriptionEventForwarder {
final String dmiAvcSubscriptionTopic = dmiAvcSubscriptionTopicPrefix + dmiName;
final CloudEvent ncmpSubscriptionCloudEvent =
- SubscriptionEventCloudMapper.toCloudEvent(ncmpSubscriptionEvent, eventKey);
+ SubscriptionEventCloudMapper.toCloudEvent(ncmpSubscriptionEvent, eventKey, eventType);
eventsPublisher.publishCloudEvent(dmiAvcSubscriptionTopic, eventKey, ncmpSubscriptionCloudEvent);
});
}
@@ -182,6 +190,7 @@ public class SubscriptionEventForwarder {
return yangModelSubscriptionEvent.getPredicates().getTargetCmHandles().stream()
.filter(targetCmHandle -> targetCmHandlesDoesNotExistInDb.contains(targetCmHandle.getCmHandleId()))
.map(target -> new YangModelSubscriptionEvent.TargetCmHandle(target.getCmHandleId(),
- SubscriptionStatus.REJECTED)).collect(Collectors.toList());
+ SubscriptionStatus.REJECTED, "Targets not found"))
+ .collect(Collectors.toList());
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapper.java
index bf9ceb1c3d..35d94cc7a2 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapper.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapper.java
@@ -25,7 +25,6 @@ import java.util.stream.Collectors;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Named;
-import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent;
@@ -47,8 +46,7 @@ public interface SubscriptionEventMapper {
*/
@Named("mapTargetsToCmHandleTargets")
default List<YangModelSubscriptionEvent.TargetCmHandle> mapTargetsToCmHandleTargets(List<String> targets) {
- return targets.stream().map(target -> new YangModelSubscriptionEvent.TargetCmHandle(target,
- SubscriptionStatus.PENDING))
+ return targets.stream().map(YangModelSubscriptionEvent.TargetCmHandle::new)
.collect(Collectors.toList());
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java
index 20df706c07..ddb9fd6fcf 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java
@@ -21,6 +21,7 @@
package org.onap.cps.ncmp.api.impl.events.avcsubscription;
import com.hazelcast.map.IMap;
+import io.cloudevents.CloudEvent;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
@@ -32,8 +33,9 @@ import org.onap.cps.ncmp.api.impl.config.embeddedcache.ForwardedSubscriptionEven
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence;
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
import org.onap.cps.ncmp.api.impl.utils.DataNodeHelper;
+import org.onap.cps.ncmp.api.impl.utils.SubscriptionEventResponseCloudMapper;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
-import org.onap.cps.ncmp.api.models.SubscriptionEventResponse;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
import org.onap.cps.spi.model.DataNode;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
@@ -61,19 +63,21 @@ public class SubscriptionEventResponseConsumer {
* @param subscriptionEventResponseConsumerRecord the event to be consumed
*/
@KafkaListener(topics = "${app.ncmp.avc.subscription-response-topic}",
- properties = {"spring.json.value.default.type=org.onap.cps.ncmp.api.models.SubscriptionEventResponse"})
+ containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
public void consumeSubscriptionEventResponse(
- final ConsumerRecord<String, SubscriptionEventResponse> subscriptionEventResponseConsumerRecord) {
- final SubscriptionEventResponse subscriptionEventResponse = subscriptionEventResponseConsumerRecord.value();
- final String clientId = subscriptionEventResponse.getClientId();
+ final ConsumerRecord<String, CloudEvent> subscriptionEventResponseConsumerRecord) {
+ final CloudEvent cloudEvent = subscriptionEventResponseConsumerRecord.value();
+ final String eventType = subscriptionEventResponseConsumerRecord.value().getType();
+ final SubscriptionEventResponse subscriptionEventResponse =
+ SubscriptionEventResponseCloudMapper.toSubscriptionEventResponse(cloudEvent);
+ final String clientId = subscriptionEventResponse.getData().getClientId();
log.info("subscription event response of clientId: {} is received.", clientId);
- final String subscriptionName = subscriptionEventResponse.getSubscriptionName();
+ final String subscriptionName = subscriptionEventResponse.getData().getSubscriptionName();
final String subscriptionEventId = clientId + subscriptionName;
boolean createOutcomeResponse = false;
if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) {
final Set<String> dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId);
-
- dmiNames.remove(subscriptionEventResponse.getDmiName());
+ dmiNames.remove(subscriptionEventResponse.getData().getDmiName());
forwardedSubscriptionEventCache.put(subscriptionEventId, dmiNames,
ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS);
createOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty();
@@ -84,7 +88,7 @@ public class SubscriptionEventResponseConsumer {
if (createOutcomeResponse
&& notificationFeatureEnabled
&& hasNoPendingCmHandles(clientId, subscriptionName)) {
- subscriptionEventResponseOutcome.sendResponse(clientId, subscriptionName);
+ subscriptionEventResponseOutcome.sendResponse(subscriptionEventResponse, eventType);
forwardedSubscriptionEventCache.remove(subscriptionEventId);
}
}
@@ -92,10 +96,15 @@ public class SubscriptionEventResponseConsumer {
private boolean hasNoPendingCmHandles(final String clientId, final String subscriptionName) {
final Collection<DataNode> dataNodeSubscription = subscriptionPersistence.getCmHandlesForSubscriptionEvent(
clientId, subscriptionName);
- final Map<String, SubscriptionStatus> cmHandleIdToStatusMap =
- DataNodeHelper.getCmHandleIdToStatusMapFromDataNodes(
- dataNodeSubscription);
- return !cmHandleIdToStatusMap.values().contains(SubscriptionStatus.PENDING);
+ final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMapOriginal =
+ DataNodeHelper.cmHandleIdToStatusAndDetailsAsMapFromDataNode(dataNodeSubscription);
+ for (final Map<String, String> statusAndDetailsMap : cmHandleIdToStatusAndDetailsAsMapOriginal.values()) {
+ final String status = statusAndDetailsMap.get("status");
+ if (SubscriptionStatus.PENDING.toString().equals(status)) {
+ return false;
+ }
+ }
+ return true;
}
private void updateSubscriptionEvent(final SubscriptionEventResponse subscriptionEventResponse) {
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapper.java
index 44181c57c9..dc122ee5d1 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapper.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapper.java
@@ -21,36 +21,35 @@
package org.onap.cps.ncmp.api.impl.events.avcsubscription;
import java.util.List;
-import java.util.Map;
import java.util.stream.Collectors;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Named;
-import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
-import org.onap.cps.ncmp.api.models.SubscriptionEventResponse;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus;
@Mapper(componentModel = "spring")
public interface SubscriptionEventResponseMapper {
- @Mapping(source = "clientId", target = "clientId")
- @Mapping(source = "subscriptionName", target = "subscriptionName")
- @Mapping(source = "cmHandleIdToStatus", target = "predicates.targetCmHandles",
- qualifiedByName = "mapStatusToCmHandleTargets")
+ @Mapping(source = "data.clientId", target = "clientId")
+ @Mapping(source = "data.subscriptionName", target = "subscriptionName")
+ @Mapping(source = "data.subscriptionStatus", target = "predicates.targetCmHandles",
+ qualifiedByName = "mapSubscriptionStatusToCmHandleTargets")
YangModelSubscriptionEvent toYangModelSubscriptionEvent(
SubscriptionEventResponse subscriptionEventResponse);
/**
- * Maps StatusToCMHandle to list of TargetCmHandle.
+ * Maps SubscriptionStatus to list of TargetCmHandle.
*
- * @param targets as a map
+ * @param subscriptionStatus as a list
* @return TargetCmHandle list
*/
- @Named("mapStatusToCmHandleTargets")
- default List<YangModelSubscriptionEvent.TargetCmHandle> mapStatusToCmHandleTargets(
- Map<String, SubscriptionStatus> targets) {
- return targets.entrySet().stream().map(target ->
- new YangModelSubscriptionEvent.TargetCmHandle(target.getKey(), target.getValue())).collect(
- Collectors.toList());
+ @Named("mapSubscriptionStatusToCmHandleTargets")
+ default List<YangModelSubscriptionEvent.TargetCmHandle> mapSubscriptionStatusToCmHandleTargets(
+ List<SubscriptionStatus> subscriptionStatus) {
+ return subscriptionStatus.stream().map(status -> new YangModelSubscriptionEvent.TargetCmHandle(status.getId(),
+ org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus.fromString(status.getStatus().value()),
+ status.getDetails())).collect(Collectors.toList());
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java
index 8fdff17944..9ed686529d 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java
@@ -20,21 +20,20 @@
package org.onap.cps.ncmp.api.impl.events.avcsubscription;
-import java.io.Serializable;
-import java.util.Collection;
+import io.cloudevents.CloudEvent;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.onap.cps.ncmp.api.NcmpEventResponseCode;
import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence;
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
import org.onap.cps.ncmp.api.impl.utils.DataNodeHelper;
-import org.onap.cps.ncmp.api.models.SubscriptionEventResponse;
-import org.onap.cps.ncmp.events.avc.subscription.v1.SubscriptionEventOutcome;
-import org.onap.cps.spi.model.DataNode;
+import org.onap.cps.ncmp.api.impl.utils.SubscriptionOutcomeCloudMapper;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.SubscriptionEventOutcome;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@@ -45,75 +44,106 @@ public class SubscriptionEventResponseOutcome {
private final SubscriptionPersistence subscriptionPersistence;
- private final EventsPublisher<SubscriptionEventOutcome> outcomeEventsPublisher;
+ private final EventsPublisher<CloudEvent> outcomeEventsPublisher;
private final SubscriptionOutcomeMapper subscriptionOutcomeMapper;
- @Value("${app.ncmp.avc.subscription-outcome-topic:cm-avc-subscription-response}")
+ @Value("${app.ncmp.avc.subscription-outcome-topic:subscription-response}")
private String subscriptionOutcomeEventTopic;
/**
* This is for construction of outcome message to be published for client apps.
*
- * @param subscriptionClientId client id of the subscription.
- * @param subscriptionName name of the subscription.
+ * @param subscriptionEventResponse event produced by Dmi Plugin
*/
- public void sendResponse(final String subscriptionClientId, final String subscriptionName) {
- final SubscriptionEventOutcome subscriptionEventOutcome = generateResponse(
- subscriptionClientId, subscriptionName);
- final Headers headers = new RecordHeaders();
+ public void sendResponse(final SubscriptionEventResponse subscriptionEventResponse, final String eventKey) {
+ final SubscriptionEventOutcome subscriptionEventOutcome =
+ formSubscriptionOutcomeMessage(subscriptionEventResponse);
+ final String subscriptionClientId = subscriptionEventResponse.getData().getClientId();
+ final String subscriptionName = subscriptionEventResponse.getData().getSubscriptionName();
final String subscriptionEventId = subscriptionClientId + subscriptionName;
- outcomeEventsPublisher.publishEvent(subscriptionOutcomeEventTopic,
- subscriptionEventId, headers, subscriptionEventOutcome);
+ final CloudEvent subscriptionOutcomeCloudEvent =
+ SubscriptionOutcomeCloudMapper.toCloudEvent(subscriptionEventOutcome,
+ subscriptionEventId, eventKey);
+ outcomeEventsPublisher.publishCloudEvent(subscriptionOutcomeEventTopic,
+ subscriptionEventId, subscriptionOutcomeCloudEvent);
}
- private SubscriptionEventOutcome generateResponse(final String subscriptionClientId,
- final String subscriptionName) {
- final Collection<DataNode> dataNodes =
- subscriptionPersistence.getCmHandlesForSubscriptionEvent(subscriptionClientId, subscriptionName);
- final List<Map<String, Serializable>> dataNodeLeaves = DataNodeHelper.getDataNodeLeaves(dataNodes);
- final List<Collection<Serializable>> cmHandleIdToStatus =
- DataNodeHelper.getCmHandleIdToStatus(dataNodeLeaves);
- final Map<String, SubscriptionStatus> cmHandleIdToStatusMap =
- DataNodeHelper.getCmHandleIdToStatusMap(cmHandleIdToStatus);
- return formSubscriptionOutcomeMessage(cmHandleIdToStatus, subscriptionClientId, subscriptionName,
- isFullOutcomeResponse(cmHandleIdToStatusMap));
+ private SubscriptionEventOutcome formSubscriptionOutcomeMessage(
+ final SubscriptionEventResponse subscriptionEventResponse) {
+ final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMap =
+ DataNodeHelper.cmHandleIdToStatusAndDetailsAsMapFromDataNode(
+ subscriptionPersistence.getCmHandlesForSubscriptionEvent(
+ subscriptionEventResponse.getData().getClientId(),
+ subscriptionEventResponse.getData().getSubscriptionName()));
+ final List<org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus>
+ subscriptionStatusList = mapCmHandleIdStatusDetailsMapToSubscriptionStatusList(
+ cmHandleIdToStatusAndDetailsAsMap);
+ subscriptionEventResponse.getData().setSubscriptionStatus(subscriptionStatusList);
+ return fromSubscriptionEventResponse(subscriptionEventResponse,
+ decideOnNcmpEventResponseCodeForSubscription(cmHandleIdToStatusAndDetailsAsMap));
}
- private boolean isFullOutcomeResponse(final Map<String, SubscriptionStatus> cmHandleIdToStatusMap) {
- return !cmHandleIdToStatusMap.values().contains(SubscriptionStatus.PENDING);
+ private static List<org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus>
+ mapCmHandleIdStatusDetailsMapToSubscriptionStatusList(
+ final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMap) {
+ return cmHandleIdToStatusAndDetailsAsMap.entrySet()
+ .stream().map(entryset -> {
+ final org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus
+ subscriptionStatus = new org.onap.cps.ncmp.events.avcsubscription1_0_0
+ .dmi_to_ncmp.SubscriptionStatus();
+ final String cmHandleId = entryset.getKey();
+ final Map<String, String> statusAndDetailsMap = entryset.getValue();
+ final String status = statusAndDetailsMap.get("status");
+ final String details = statusAndDetailsMap.get("details");
+ subscriptionStatus.setId(cmHandleId);
+ subscriptionStatus.setStatus(
+ org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp
+ .SubscriptionStatus.Status.fromValue(status));
+ subscriptionStatus.setDetails(details);
+ return subscriptionStatus;
+ }).collect(Collectors.toList());
}
- private SubscriptionEventOutcome formSubscriptionOutcomeMessage(
- final List<Collection<Serializable>> cmHandleIdToStatus, final String subscriptionClientId,
- final String subscriptionName, final boolean isFullOutcomeResponse) {
+ private NcmpEventResponseCode decideOnNcmpEventResponseCodeForSubscription(
+ final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMap) {
- final SubscriptionEventResponse subscriptionEventResponse = toSubscriptionEventResponse(
- cmHandleIdToStatus, subscriptionClientId, subscriptionName);
+ final boolean isAllTargetsPending = isAllTargetCmHandleStatusMatch(cmHandleIdToStatusAndDetailsAsMap,
+ SubscriptionStatus.PENDING);
- final SubscriptionEventOutcome subscriptionEventOutcome =
- subscriptionOutcomeMapper.toSubscriptionEventOutcome(subscriptionEventResponse);
+ final boolean isAllTargetsRejected = isAllTargetCmHandleStatusMatch(cmHandleIdToStatusAndDetailsAsMap,
+ SubscriptionStatus.REJECTED);
+
+ final boolean isAllTargetsAccepted = isAllTargetCmHandleStatusMatch(cmHandleIdToStatusAndDetailsAsMap,
+ SubscriptionStatus.ACCEPTED);
- if (isFullOutcomeResponse) {
- subscriptionEventOutcome.setEventType(SubscriptionEventOutcome.EventType.COMPLETE_OUTCOME);
+ if (isAllTargetsAccepted) {
+ return NcmpEventResponseCode.SUCCESSFULLY_APPLIED_SUBSCRIPTION;
+ } else if (isAllTargetsRejected) {
+ return NcmpEventResponseCode.SUBSCRIPTION_NOT_APPLICABLE;
+ } else if (isAllTargetsPending) {
+ return NcmpEventResponseCode.SUBSCRIPTION_PENDING;
} else {
- subscriptionEventOutcome.setEventType(SubscriptionEventOutcome.EventType.PARTIAL_OUTCOME);
+ return NcmpEventResponseCode.PARTIALLY_APPLIED_SUBSCRIPTION;
}
+ }
- return subscriptionEventOutcome;
+ private boolean isAllTargetCmHandleStatusMatch(
+ final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMap,
+ final SubscriptionStatus subscriptionStatus) {
+ return cmHandleIdToStatusAndDetailsAsMap.values().stream()
+ .allMatch(entryset -> entryset.containsValue(subscriptionStatus.toString()));
}
- private SubscriptionEventResponse toSubscriptionEventResponse(
- final List<Collection<Serializable>> cmHandleIdToStatus, final String subscriptionClientId,
- final String subscriptionName) {
- final Map<String, SubscriptionStatus> cmHandleIdToStatusMap =
- DataNodeHelper.getCmHandleIdToStatusMap(cmHandleIdToStatus);
+ private SubscriptionEventOutcome fromSubscriptionEventResponse(
+ final SubscriptionEventResponse subscriptionEventResponse,
+ final NcmpEventResponseCode ncmpEventResponseCode) {
- final SubscriptionEventResponse subscriptionEventResponse = new SubscriptionEventResponse();
- subscriptionEventResponse.setClientId(subscriptionClientId);
- subscriptionEventResponse.setSubscriptionName(subscriptionName);
- subscriptionEventResponse.setCmHandleIdToStatus(cmHandleIdToStatusMap);
+ final SubscriptionEventOutcome subscriptionEventOutcome =
+ subscriptionOutcomeMapper.toSubscriptionEventOutcome(subscriptionEventResponse);
+ subscriptionEventOutcome.getData().setStatusCode(Integer.parseInt(ncmpEventResponseCode.getStatusCode()));
+ subscriptionEventOutcome.getData().setStatusMessage(ncmpEventResponseCode.getStatusMessage());
- return subscriptionEventResponse;
+ return subscriptionEventOutcome;
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapper.java
index cecde5f816..7803b982f3 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapper.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapper.java
@@ -26,63 +26,80 @@ import java.util.stream.Collectors;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Named;
-import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
-import org.onap.cps.ncmp.api.models.SubscriptionEventResponse;
-import org.onap.cps.ncmp.events.avc.subscription.v1.SubscriptionEventOutcome;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.AdditionalInfo;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.AdditionalInfoDetail;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.SubscriptionEventOutcome;
+import org.onap.cps.spi.exceptions.DataValidationException;
@Mapper(componentModel = "spring")
public interface SubscriptionOutcomeMapper {
- @Mapping(source = "clientId", target = "event.subscription.clientID")
- @Mapping(source = "subscriptionName", target = "event.subscription.name")
- @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.rejectedTargets",
- qualifiedByName = "mapStatusToCmHandleRejected")
- @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.acceptedTargets",
- qualifiedByName = "mapStatusToCmHandleAccepted")
- @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.pendingTargets",
- qualifiedByName = "mapStatusToCmHandlePending")
- SubscriptionEventOutcome toSubscriptionEventOutcome(
- SubscriptionEventResponse subscriptionEventResponse);
+ @Mapping(source = "data.subscriptionStatus", target = "data.additionalInfo",
+ qualifiedByName = "mapListOfSubscriptionStatusToAdditionalInfo")
+ SubscriptionEventOutcome toSubscriptionEventOutcome(SubscriptionEventResponse subscriptionEventResponse);
/**
- * Maps StatusToCMHandle to list of TargetCmHandle rejected.
+ * Maps list of SubscriptionStatus to an AdditionalInfo.
*
- * @param targets as a map
- * @return TargetCmHandle list
+ * @param subscriptionStatusList containing details
+ * @return an AdditionalInfo
*/
- @Named("mapStatusToCmHandleRejected")
- default List<Object> mapStatusToCmHandleRejected(Map<String, SubscriptionStatus> targets) {
- return targets.entrySet()
- .stream().filter(target -> SubscriptionStatus.REJECTED.equals(target.getValue()))
- .map(Map.Entry::getKey)
- .collect(Collectors.toList());
+ @Named("mapListOfSubscriptionStatusToAdditionalInfo")
+ default AdditionalInfo mapListOfSubscriptionStatusToAdditionalInfo(
+ final List<SubscriptionStatus> subscriptionStatusList) {
+ if (subscriptionStatusList == null || subscriptionStatusList.isEmpty()) {
+ throw new DataValidationException("Invalid subscriptionStatusList",
+ "SubscriptionStatus list cannot be null or empty");
+ }
+
+ final Map<String, List<SubscriptionStatus>> rejectedSubscriptionsPerDetails = getSubscriptionsPerDetails(
+ subscriptionStatusList, SubscriptionStatus.Status.REJECTED);
+ final Map<String, List<String>> rejectedCmHandlesPerDetails =
+ getCmHandlesPerDetails(rejectedSubscriptionsPerDetails);
+ final List<AdditionalInfoDetail> rejectedCmHandles = getAdditionalInfoDetailList(rejectedCmHandlesPerDetails);
+
+
+ final Map<String, List<SubscriptionStatus>> pendingSubscriptionsPerDetails = getSubscriptionsPerDetails(
+ subscriptionStatusList, SubscriptionStatus.Status.PENDING);
+ final Map<String, List<String>> pendingCmHandlesPerDetails =
+ getCmHandlesPerDetails(pendingSubscriptionsPerDetails);
+ final List<AdditionalInfoDetail> pendingCmHandles = getAdditionalInfoDetailList(pendingCmHandlesPerDetails);
+
+ final AdditionalInfo additionalInfo = new AdditionalInfo();
+ additionalInfo.setRejected(rejectedCmHandles);
+ additionalInfo.setPending(pendingCmHandles);
+
+ return additionalInfo;
}
- /**
- * Maps StatusToCMHandle to list of TargetCmHandle accepted.
- *
- * @param targets as a map
- * @return TargetCmHandle list
- */
- @Named("mapStatusToCmHandleAccepted")
- default List<Object> mapStatusToCmHandleAccepted(Map<String, SubscriptionStatus> targets) {
- return targets.entrySet()
- .stream().filter(target -> SubscriptionStatus.ACCEPTED.equals(target.getValue()))
- .map(Map.Entry::getKey)
- .collect(Collectors.toList());
+ private static Map<String, List<SubscriptionStatus>> getSubscriptionsPerDetails(
+ final List<SubscriptionStatus> subscriptionStatusList, final SubscriptionStatus.Status status) {
+ return subscriptionStatusList.stream()
+ .filter(subscriptionStatus -> subscriptionStatus.getStatus() == status)
+ .collect(Collectors.groupingBy(SubscriptionStatus::getDetails));
}
- /**
- * Maps StatusToCMHandle to list of TargetCmHandle pending.
- *
- * @param targets as a map
- * @return TargetCmHandle list
- */
- @Named("mapStatusToCmHandlePending")
- default List<Object> mapStatusToCmHandlePending(Map<String, SubscriptionStatus> targets) {
- return targets.entrySet()
- .stream().filter(target -> SubscriptionStatus.PENDING.equals(target.getValue()))
- .map(Map.Entry::getKey)
- .collect(Collectors.toList());
+ private static Map<String, List<String>> getCmHandlesPerDetails(
+ final Map<String, List<SubscriptionStatus>> subscriptionsPerDetails) {
+ return subscriptionsPerDetails.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> entry.getValue().stream()
+ .map(SubscriptionStatus::getId)
+ .collect(Collectors.toList())
+ ));
+ }
+
+ private static List<AdditionalInfoDetail> getAdditionalInfoDetailList(
+ final Map<String, List<String>> cmHandlesPerDetails) {
+ return cmHandlesPerDetails.entrySet().stream()
+ .map(entry -> {
+ final AdditionalInfoDetail detail = new AdditionalInfoDetail();
+ detail.setDetails(entry.getKey());
+ detail.setTargets(entry.getValue());
+ return detail;
+ }).collect(Collectors.toList());
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java
index d2b1237a4d..83a375b1b8 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java
@@ -22,7 +22,6 @@ package org.onap.cps.ncmp.api.impl.subscriptions;
import static org.onap.cps.ncmp.api.impl.constants.DmiRegistryConstants.NO_TIMESTAMP;
-import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@@ -70,33 +69,46 @@ public class SubscriptionPersistenceImpl implements SubscriptionPersistence {
private void findDeltaCmHandlesAddOrUpdateInDatabase(final YangModelSubscriptionEvent yangModelSubscriptionEvent,
final String clientId, final String subscriptionName,
final Collection<DataNode> dataNodes) {
- final Map<String, SubscriptionStatus> cmHandleIdsFromYangModel =
+ final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMapNew =
extractCmHandleFromYangModelAsMap(yangModelSubscriptionEvent);
- final Map<String, SubscriptionStatus> cmHandleIdsFromDatabase =
- extractCmHandleFromDbAsMap(dataNodes);
+ final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMapOriginal =
+ DataNodeHelper.cmHandleIdToStatusAndDetailsAsMapFromDataNode(dataNodes);
- final Map<String, SubscriptionStatus> newCmHandles =
- mapDifference(cmHandleIdsFromYangModel, cmHandleIdsFromDatabase);
- traverseCmHandleList(newCmHandles, clientId, subscriptionName, true);
+ final Map<String, Map<String, String>> newTargetCmHandles =
+ mapDifference(cmHandleIdToStatusAndDetailsAsMapNew,
+ cmHandleIdToStatusAndDetailsAsMapOriginal);
+ traverseCmHandleList(newTargetCmHandles, clientId, subscriptionName, true);
- final Map<String, SubscriptionStatus> existingCmHandles =
- mapDifference(cmHandleIdsFromYangModel, newCmHandles);
- traverseCmHandleList(existingCmHandles, clientId, subscriptionName, false);
+ final Map<String, Map<String, String>> existingTargetCmHandles =
+ mapDifference(cmHandleIdToStatusAndDetailsAsMapNew, newTargetCmHandles);
+ traverseCmHandleList(existingTargetCmHandles, clientId, subscriptionName, false);
}
- private boolean isSubscriptionRegistryEmptyOrNonExist(final Collection<DataNode> dataNodes,
- final String clientId, final String subscriptionName) {
- final Optional<DataNode> dataNodeFirst = dataNodes.stream().findFirst();
- return ((dataNodeFirst.isPresent() && dataNodeFirst.get().getChildDataNodes().isEmpty())
- || getCmHandlesForSubscriptionEvent(clientId, subscriptionName).isEmpty());
- }
-
- private void traverseCmHandleList(final Map<String, SubscriptionStatus> cmHandleMap,
+ private static Map<String, Map<String, String>> extractCmHandleFromYangModelAsMap(
+ final YangModelSubscriptionEvent yangModelSubscriptionEvent) {
+ return yangModelSubscriptionEvent.getPredicates().getTargetCmHandles()
+ .stream().collect(
+ HashMap<String, Map<String, String>>::new,
+ (result, cmHandle) -> {
+ final String cmHandleId = cmHandle.getCmHandleId();
+ final SubscriptionStatus status = cmHandle.getStatus();
+ final String details = cmHandle.getDetails();
+
+ if (cmHandleId != null && status != null) {
+ result.put(cmHandleId, new HashMap<>());
+ result.get(cmHandleId).put("status", status.toString());
+ result.get(cmHandleId).put("details", details == null ? "" : details);
+ }
+ },
+ HashMap::putAll
+ );
+ }
+
+ private void traverseCmHandleList(final Map<String, Map<String, String>> cmHandleMap,
final String clientId,
final String subscriptionName,
final boolean isAddListElementOperation) {
- final List<YangModelSubscriptionEvent.TargetCmHandle> cmHandleList =
- targetCmHandlesAsList(cmHandleMap);
+ final List<YangModelSubscriptionEvent.TargetCmHandle> cmHandleList = targetCmHandlesAsList(cmHandleMap);
for (final YangModelSubscriptionEvent.TargetCmHandle targetCmHandle : cmHandleList) {
final String targetCmHandleAsJson =
createTargetCmHandleJsonData(jsonObjectMapper.asJsonString(targetCmHandle));
@@ -105,6 +117,13 @@ public class SubscriptionPersistenceImpl implements SubscriptionPersistence {
}
}
+ private boolean isSubscriptionRegistryEmptyOrNonExist(final Collection<DataNode> dataNodes,
+ final String clientId, final String subscriptionName) {
+ final Optional<DataNode> dataNodeFirst = dataNodes.stream().findFirst();
+ return ((dataNodeFirst.isPresent() && dataNodeFirst.get().getChildDataNodes().isEmpty())
+ || getCmHandlesForSubscriptionEvent(clientId, subscriptionName).isEmpty());
+ }
+
private void addOrReplaceCmHandlePredicateListElement(final String targetCmHandleAsJson,
final String clientId,
final String subscriptionName,
@@ -142,25 +161,16 @@ public class SubscriptionPersistenceImpl implements SubscriptionPersistence {
FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS);
}
- private static Map<String, SubscriptionStatus> extractCmHandleFromDbAsMap(final Collection<DataNode> dataNodes) {
- final List<Map<String, Serializable>> dataNodeLeaves = DataNodeHelper.getDataNodeLeaves(dataNodes);
- final List<Collection<Serializable>> cmHandleIdToStatus = DataNodeHelper.getCmHandleIdToStatus(dataNodeLeaves);
- return DataNodeHelper.getCmHandleIdToStatusMap(cmHandleIdToStatus);
- }
-
- private static Map<String, SubscriptionStatus> extractCmHandleFromYangModelAsMap(
- final YangModelSubscriptionEvent yangModelSubscriptionEvent) {
- return yangModelSubscriptionEvent.getPredicates().getTargetCmHandles()
- .stream().collect(Collectors.toMap(
- YangModelSubscriptionEvent.TargetCmHandle::getCmHandleId,
- YangModelSubscriptionEvent.TargetCmHandle::getStatus));
- }
-
private static List<YangModelSubscriptionEvent.TargetCmHandle> targetCmHandlesAsList(
- final Map<String, SubscriptionStatus> newCmHandles) {
- return newCmHandles.entrySet().stream().map(entry ->
- new YangModelSubscriptionEvent.TargetCmHandle(entry.getKey(),
- entry.getValue())).collect(Collectors.toList());
+ final Map<String, Map<String, String>> newCmHandles) {
+ return newCmHandles.entrySet().stream().map(entry -> {
+ final String cmHandleId = entry.getKey();
+ final Map<String, String> statusAndDetailsMap = entry.getValue();
+ final String status = statusAndDetailsMap.get("status");
+ final String details = statusAndDetailsMap.get("details");
+ return new YangModelSubscriptionEvent.TargetCmHandle(cmHandleId,
+ SubscriptionStatus.fromString(status), details);
+ }).collect(Collectors.toList());
}
private static String createSubscriptionEventJsonData(final String yangModelSubscriptionAsJson) {
@@ -181,9 +191,9 @@ public class SubscriptionPersistenceImpl implements SubscriptionPersistence {
+ "' and @subscriptionName='" + subscriptionName + "']";
}
- private static <K, V> Map<K, V> mapDifference(final Map<? extends K, ? extends V> left,
- final Map<? extends K, ? extends V> right) {
- final Map<K, V> difference = new HashMap<>();
+ private static <K, L, M> Map<K, Map<L, M>> mapDifference(final Map<K, Map<L, M>> left,
+ final Map<K, Map<L, M>> right) {
+ final Map<K, Map<L, M>> difference = new HashMap<>();
difference.putAll(left);
difference.putAll(right);
difference.entrySet().removeAll(right.entrySet());
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionStatus.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionStatus.java
index ce3b88ba03..63ab102d14 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionStatus.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionStatus.java
@@ -20,36 +20,30 @@
package org.onap.cps.ncmp.api.impl.subscriptions;
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.Map;
public enum SubscriptionStatus {
- ACCEPTED,
- REJECTED,
- PENDING;
+ ACCEPTED("ACCEPTED"),
+ REJECTED("REJECTED"),
+ PENDING("PENDING");
+ private final String subscriptionStatusValue;
+
+ SubscriptionStatus(final String subscriptionStatusValue) {
+ this.subscriptionStatusValue = subscriptionStatusValue;
+ }
/**
- * Populates a map with a key of cm handle id and a value of subscription status.
+ * Finds the value of the given enum.
*
- * @param resultMap the map is being populated
- * @param bucketIterator to iterate over the collection
+ * @param statusValue value of the enum
+ * @return a SubscriptionStatus
*/
- public static void populateCmHandleToSubscriptionStatusMap(final Map<String, SubscriptionStatus> resultMap,
- final Iterator<Serializable> bucketIterator) {
- final String item = (String) bucketIterator.next();
- if ("PENDING".equals(item)) {
- resultMap.put((String) bucketIterator.next(),
- SubscriptionStatus.PENDING);
- }
- if ("REJECTED".equals(item)) {
- resultMap.put((String) bucketIterator.next(),
- SubscriptionStatus.REJECTED);
- }
- if ("ACCEPTED".equals(item)) {
- resultMap.put((String) bucketIterator.next(),
- SubscriptionStatus.ACCEPTED);
+ public static SubscriptionStatus fromString(final String statusValue) {
+ for (final SubscriptionStatus subscriptionStatusType : SubscriptionStatus.values()) {
+ if (subscriptionStatusType.subscriptionStatusValue.equalsIgnoreCase(statusValue)) {
+ return subscriptionStatusType;
+ }
}
+ return null;
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java
index f42a378fcb..c032d1e8a4 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java
@@ -23,14 +23,12 @@ package org.onap.cps.ncmp.api.impl.utils;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
import org.onap.cps.spi.model.DataNode;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@@ -50,8 +48,8 @@ public class DataNodeHelper {
/**
* The leaves for each DataNode is listed as map.
*
- * @param dataNodes as collection.
- * @return list of map for the all leaves.
+ * @param dataNodes as collection
+ * @return list of map for the all leaves
*/
public static List<Map<String, Serializable>> getDataNodeLeaves(final Collection<DataNode> dataNodes) {
return dataNodes.stream()
@@ -61,47 +59,42 @@ public class DataNodeHelper {
}
/**
- * The cm handle and status is listed as a collection.
+ * Extracts the mapping of cm handle id to status with details from nodes leaves.
*
- * @param dataNodeLeaves as a list of map.
- * @return list of collection containing cm handle id and statuses.
+ * @param dataNodeLeaves as a list of map
+ * @return cm handle id to status and details mapping
*/
- public static List<Collection<Serializable>> getCmHandleIdToStatus(
+ public static Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMap(
final List<Map<String, Serializable>> dataNodeLeaves) {
return dataNodeLeaves.stream()
- .map(Map::values)
- .filter(col -> col.contains("PENDING")
- || col.contains("ACCEPTED")
- || col.contains("REJECTED"))
- .collect(Collectors.toList());
- }
+ .filter(entryset -> entryset.values().contains("PENDING")
+ || entryset.values().contains("ACCEPTED")
+ || entryset.values().contains("REJECTED"))
+ .collect(
+ HashMap<String, Map<String, String>>::new,
+ (result, entry) -> {
+ final String cmHandleId = (String) entry.get("cmHandleId");
+ final String status = (String) entry.get("status");
+ final String details = (String) entry.get("details");
- /**
- * The cm handle and status is returned as a map.
- *
- * @param cmHandleIdToStatus as a list of collection
- * @return a map of cm handle id to status
- */
- public static Map<String, SubscriptionStatus> getCmHandleIdToStatusMap(
- final List<Collection<Serializable>> cmHandleIdToStatus) {
- final Map<String, SubscriptionStatus> resultMap = new HashMap<>();
- for (final Collection<Serializable> cmHandleToStatusBucket: cmHandleIdToStatus) {
- final Iterator<Serializable> bucketIterator = cmHandleToStatusBucket.iterator();
- while (bucketIterator.hasNext()) {
- SubscriptionStatus.populateCmHandleToSubscriptionStatusMap(resultMap, bucketIterator);
- }
- }
- return resultMap;
+ if (cmHandleId != null && status != null) {
+ result.put(cmHandleId, new HashMap<>());
+ result.get(cmHandleId).put("status", status);
+ result.get(cmHandleId).put("details", details == null ? "" : details);
+ }
+ },
+ HashMap::putAll
+ );
}
/**
- * Extracts the mapping of cm handle id to status from data node collection.
+ * Extracts the mapping of cm handle id to status with details from data node collection.
*
* @param dataNodes as a collection
- * @return cm handle id to status mapping
+ * @return cm handle id to status and details mapping
*/
- public static Map<String, SubscriptionStatus> getCmHandleIdToStatusMapFromDataNodes(
+ public static Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMapFromDataNode(
final Collection<DataNode> dataNodes) {
- return getCmHandleIdToStatusMap(getCmHandleIdToStatus(getDataNodeLeaves(dataNodes)));
+ return cmHandleIdToStatusAndDetailsAsMap(getDataNodeLeaves(dataNodes));
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapper.java
index a7de479046..df3998fe80 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapper.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapper.java
@@ -27,10 +27,12 @@ import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.data.PojoCloudEventData;
import io.cloudevents.jackson.PojoCloudEventDataMapper;
import java.net.URI;
+import java.util.UUID;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent;
+import org.onap.cps.spi.exceptions.CloudEventConstructionException;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
@@ -38,6 +40,8 @@ public class SubscriptionEventCloudMapper {
private static final ObjectMapper objectMapper = new ObjectMapper();
+ private static String randomId = UUID.randomUUID().toString();
+
/**
* Maps CloudEvent object to SubscriptionEvent.
*
@@ -62,18 +66,24 @@ public class SubscriptionEventCloudMapper {
*
* @param ncmpSubscriptionEvent object.
* @param eventKey as String.
- * @return CloudEvent builded.
+ * @return CloudEvent built.
*/
public static CloudEvent toCloudEvent(
final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent ncmpSubscriptionEvent,
- final String eventKey) {
+ final String eventKey, final String eventType) {
try {
return CloudEventBuilder.v1()
- .withData(objectMapper.writeValueAsBytes(ncmpSubscriptionEvent))
- .withId(eventKey).withType("CREATE").withSource(
- URI.create(ncmpSubscriptionEvent.getData().getSubscription().getClientID())).build();
+ .withId(randomId)
+ .withSource(URI.create(ncmpSubscriptionEvent.getData().getSubscription().getClientID()))
+ .withType(eventType)
+ .withExtension("correlationid", eventKey)
+ .withDataSchema(URI.create("urn:cps:"
+ + org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi
+ .SubscriptionEvent.class.getName() + ":1.0.0"))
+ .withData(objectMapper.writeValueAsBytes(ncmpSubscriptionEvent)).build();
} catch (final Exception ex) {
- throw new RuntimeException("The Cloud Event could not be constructed.", ex);
+ throw new CloudEventConstructionException("The Cloud Event could not be constructed", "Invalid object to "
+ + "serialize or required headers is missing", ex);
}
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventResponseCloudMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventResponseCloudMapper.java
new file mode 100644
index 0000000000..17aba65cf7
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventResponseCloudMapper.java
@@ -0,0 +1,57 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.utils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.CloudEventUtils;
+import io.cloudevents.core.data.PojoCloudEventData;
+import io.cloudevents.jackson.PojoCloudEventDataMapper;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+@Slf4j
+public class SubscriptionEventResponseCloudMapper {
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ /**
+ * Maps CloudEvent object to SubscriptionEventResponse.
+ *
+ * @param cloudEvent object
+ * @return SubscriptionEventResponse deserialized
+ */
+ public static SubscriptionEventResponse toSubscriptionEventResponse(final CloudEvent cloudEvent) {
+ final PojoCloudEventData<SubscriptionEventResponse> deserializedCloudEvent = CloudEventUtils
+ .mapData(cloudEvent, PojoCloudEventDataMapper.from(objectMapper, SubscriptionEventResponse.class));
+ if (deserializedCloudEvent == null) {
+ log.debug("No data found in the consumed subscription response event");
+ return null;
+ } else {
+ final SubscriptionEventResponse subscriptionEventResponse = deserializedCloudEvent.getValue();
+ log.debug("Consuming subscription response event {}", subscriptionEventResponse);
+ return subscriptionEventResponse;
+ }
+ }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionOutcomeCloudMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionOutcomeCloudMapper.java
new file mode 100644
index 0000000000..92c5656121
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionOutcomeCloudMapper.java
@@ -0,0 +1,63 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.utils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import java.net.URI;
+import java.util.UUID;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.SubscriptionEventOutcome;
+import org.onap.cps.spi.exceptions.CloudEventConstructionException;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+@Slf4j
+public class SubscriptionOutcomeCloudMapper {
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ private static String randomId = UUID.randomUUID().toString();
+
+ /**
+ * Maps SubscriptionEventOutcome to a CloudEvent.
+ *
+ * @param subscriptionEventOutcome object
+ * @return CloudEvent
+ */
+ public static CloudEvent toCloudEvent(final SubscriptionEventOutcome subscriptionEventOutcome,
+ final String eventKey, final String eventType) {
+ try {
+ return CloudEventBuilder.v1()
+ .withId(randomId)
+ .withSource(URI.create("NCMP"))
+ .withType(eventType)
+ .withExtension("correlationid", eventKey)
+ .withDataSchema(URI.create("urn:cps:" + SubscriptionEventOutcome.class.getName() + ":1.0.0"))
+ .withData(objectMapper.writeValueAsBytes(subscriptionEventOutcome)).build();
+ } catch (final Exception ex) {
+ throw new CloudEventConstructionException("The Cloud Event could not be constructed", "Invalid object to "
+ + "serialize or required headers is missing", ex);
+ }
+ }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/yangmodels/YangModelSubscriptionEvent.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/yangmodels/YangModelSubscriptionEvent.java
index 4dcc5797ca..866bfd4e7b 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/yangmodels/YangModelSubscriptionEvent.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/yangmodels/YangModelSubscriptionEvent.java
@@ -81,9 +81,18 @@ public class YangModelSubscriptionEvent {
@JsonProperty()
private final SubscriptionStatus status;
+ @JsonProperty()
+ private final String details;
+
+ /**
+ * Constructor with single parameter for TargetCmHandle.
+ *
+ * @param cmHandleId as cm handle id
+ */
public TargetCmHandle(final String cmHandleId) {
this.cmHandleId = cmHandleId;
this.status = SubscriptionStatus.PENDING;
+ this.details = "Subscription forwarded to dmi plugin";
}
}
}
diff --git a/cps-ncmp-service/src/main/resources/model/subscription.yang b/cps-ncmp-service/src/main/resources/model/subscription.yang
index e332a2898a..7096c18abc 100644
--- a/cps-ncmp-service/src/main/resources/model/subscription.yang
+++ b/cps-ncmp-service/src/main/resources/model/subscription.yang
@@ -41,6 +41,10 @@ module subscription {
leaf status {
type string;
}
+
+ leaf details {
+ type string;
+ }
}
leaf datastore {
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumerSpec.groovy
index d4ab1e88ad..5f6077351d 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumerSpec.groovy
@@ -58,7 +58,7 @@ class SubscriptionEventConsumerSpec extends MessagingBaseSpec {
testEventSent.getData().getDataType().setDataCategory(dataCategory)
def testCloudEventSent = CloudEventBuilder.v1()
.withData(objectMapper.writeValueAsBytes(testEventSent))
- .withId('some-event-id')
+ .withId('subscriptionCreated')
.withType(dataType)
.withSource(URI.create('some-resource'))
.withExtension('correlationid', 'test-cmhandle1').build()
@@ -74,34 +74,35 @@ class SubscriptionEventConsumerSpec extends MessagingBaseSpec {
and: 'the event is persisted'
numberOfTimesToPersist * mockSubscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent)
and: 'the event is forwarded'
- numberOfTimesToForward * mockSubscriptionEventForwarder.forwardCreateSubscriptionEvent(testEventSent)
+ numberOfTimesToForward * mockSubscriptionEventForwarder.forwardCreateSubscriptionEvent(testEventSent, 'subscriptionCreated')
where: 'given values are used'
- scenario | dataCategory | dataType | isNotificationEnabled | isModelLoaderEnabled || numberOfTimesToForward || numberOfTimesToPersist
- 'Both model loader and notification are enabled' | 'CM' | 'CREATE' | true | true || 1 || 1
- 'Both model loader and notification are disabled' | 'CM' | 'CREATE' | false | false || 0 || 0
- 'Model loader enabled and notification disabled' | 'CM' | 'CREATE' | false | true || 0 || 1
- 'Model loader disabled and notification enabled' | 'CM' | 'CREATE' | true | false || 1 || 0
- 'Flags are enabled but data category is FM' | 'FM' | 'CREATE' | true | true || 0 || 0
- 'Flags are enabled but data type is UPDATE' | 'CM' | 'UPDATE' | true | true || 0 || 1
+ scenario | dataCategory | dataType | isNotificationEnabled | isModelLoaderEnabled || numberOfTimesToForward || numberOfTimesToPersist
+ 'Both model loader and notification are enabled' | 'CM' | 'subscriptionCreated' | true | true || 1 || 1
+ 'Both model loader and notification are disabled' | 'CM' | 'subscriptionCreated' | false | false || 0 || 0
+ 'Model loader enabled and notification disabled' | 'CM' | 'subscriptionCreated' | false | true || 0 || 1
+ 'Model loader disabled and notification enabled' | 'CM' | 'subscriptionCreated' | true | false || 1 || 0
+ 'Flags are enabled but data category is FM' | 'FM' | 'subscriptionCreated' | true | true || 0 || 0
+ 'Flags are enabled but data type is UPDATE' | 'CM' | 'subscriptionUpdated' | true | true || 0 || 1
}
def 'Consume event with wrong datastore causes an exception'() {
given: 'an event'
def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
- and: 'datastore is set to a non passthrough datastore'
+ and: 'datastore is set to a passthrough-running datastore'
testEventSent.getData().getPredicates().setDatastore('operational')
def testCloudEventSent = CloudEventBuilder.v1()
.withData(objectMapper.writeValueAsBytes(testEventSent))
.withId('some-event-id')
- .withType('CREATE')
+ .withType('some-event-type')
.withSource(URI.create('some-resource'))
.withExtension('correlationid', 'test-cmhandle1').build()
def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
when: 'the valid event is consumed'
objectUnderTest.consumeSubscriptionEvent(consumerRecord)
then: 'an operation not yet supported exception is thrown'
- thrown(OperationNotYetSupportedException)
+ def exception = thrown(OperationNotYetSupportedException)
+ exception.details == 'passthrough-running datastores are currently only supported for event subscriptions'
}
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy
index 2af32c20e9..4343c23c96 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy
@@ -35,6 +35,8 @@ import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent.TargetCm
import org.onap.cps.ncmp.api.inventory.InventoryPersistence
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.Data
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse
import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.CmHandle;
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.spi.exceptions.OperationNotYetSupportedException
@@ -75,13 +77,6 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
given: 'an event'
def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
- and: 'the some of the cm handles will be accepted and some of rejected'
- def cmHandlesToBeSavedInDb = [new TargetCmHandle('CMHandle1', SubscriptionStatus.ACCEPTED),
- new TargetCmHandle('CMHandle2',SubscriptionStatus.ACCEPTED),
- new TargetCmHandle('CMHandle3',SubscriptionStatus.REJECTED)]
- and: 'a yang model subscription event will be saved into the db'
- def yangModelSubscriptionEventWithAcceptedAndRejectedCmHandles = subscriptionEventMapper.toYangModelSubscriptionEvent(testEventSent)
- yangModelSubscriptionEventWithAcceptedAndRejectedCmHandles.getPredicates().setTargetCmHandles(cmHandlesToBeSavedInDb)
and: 'the InventoryPersistence returns private properties for the supplied CM Handles'
1 * mockInventoryPersistence.getYangModelCmHandles(["CMHandle1", "CMHandle2", "CMHandle3"]) >> [
createYangModelCmHandleWithDmiProperty(1, 1,"shape","circle"),
@@ -92,7 +87,7 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
and: 'a Blocking Variable is used for the Asynchronous call with a timeout of 5 seconds'
def block = new BlockingVariable<Object>(5)
when: 'the valid event is forwarded'
- objectUnderTest.forwardCreateSubscriptionEvent(testEventSent)
+ objectUnderTest.forwardCreateSubscriptionEvent(testEventSent, 'subscriptionCreated')
then: 'An asynchronous call is made to the blocking variable'
block.get()
then: 'the event is added to the forwarded subscription event cache'
@@ -106,8 +101,6 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
targets == [cmHandle2, cmHandle1]
}
)
- and: 'the persistence service save the yang model subscription event'
- 1 * mockSubscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEventWithAcceptedAndRejectedCmHandles)
and: 'a separate thread has been created where the map is polled'
1 * mockForwardedSubscriptionEventCache.containsKey("SCO-9989752cm-subscription-001") >> true
1 * mockSubscriptionEventResponseOutcome.sendResponse(*_)
@@ -122,7 +115,7 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
and: 'the target CMHandles are set to #scenario'
testEventSent.getData().getPredicates().setTargets(invalidTargets)
when: 'the event is forwarded'
- objectUnderTest.forwardCreateSubscriptionEvent(testEventSent)
+ objectUnderTest.forwardCreateSubscriptionEvent(testEventSent, 'some-event-type')
then: 'an operation not yet supported exception is thrown'
thrown(OperationNotYetSupportedException)
where:
@@ -136,13 +129,17 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
given: 'an event'
def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+ and: 'a subscription event response'
+ def emptySubscriptionEventResponse = new SubscriptionEventResponse().withData(new Data());
+ emptySubscriptionEventResponse.getData().setSubscriptionName('cm-subscription-001');
+ emptySubscriptionEventResponse.getData().setClientId('SCO-9989752');
and: 'the cm handles will be rejected'
- def rejectedCmHandles = [new TargetCmHandle('CMHandle1', SubscriptionStatus.REJECTED),
- new TargetCmHandle('CMHandle2',SubscriptionStatus.REJECTED),
- new TargetCmHandle('CMHandle3',SubscriptionStatus.REJECTED)]
+ def rejectedCmHandles = [new TargetCmHandle('CMHandle1', SubscriptionStatus.REJECTED, 'Cm handle does not exist'),
+ new TargetCmHandle('CMHandle2',SubscriptionStatus.REJECTED, 'Cm handle does not exist'),
+ new TargetCmHandle('CMHandle3',SubscriptionStatus.REJECTED, 'Cm handle does not exist')]
and: 'a yang model subscription event will be saved into the db with rejected cm handles'
- def yangModelSubscriptionEventWithRejectedCmHandles = subscriptionEventMapper.toYangModelSubscriptionEvent(testEventSent)
- yangModelSubscriptionEventWithRejectedCmHandles.getPredicates().setTargetCmHandles(rejectedCmHandles)
+ def yangModelSubscriptionEvent = subscriptionEventMapper.toYangModelSubscriptionEvent(testEventSent)
+ yangModelSubscriptionEvent.getPredicates().setTargetCmHandles(rejectedCmHandles)
and: 'the InventoryPersistence returns no private properties for the supplied CM Handles'
1 * mockInventoryPersistence.getYangModelCmHandles(["CMHandle1", "CMHandle2", "CMHandle3"]) >> []
and: 'the thread creation delay is reduced to 2 seconds for testing'
@@ -150,7 +147,7 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
and: 'a Blocking Variable is used for the Asynchronous call with a timeout of 5 seconds'
def block = new BlockingVariable<Object>(5)
when: 'the valid event is forwarded'
- objectUnderTest.forwardCreateSubscriptionEvent(testEventSent)
+ objectUnderTest.forwardCreateSubscriptionEvent(testEventSent, 'subscriptionCreatedStatus')
then: 'the event is not added to the forwarded subscription event cache'
0 * mockForwardedSubscriptionEventCache.put("SCO-9989752cm-subscription-001", ["DMIName1", "DMIName2"] as Set)
and: 'the event is not being forwarded with the CMHandle private properties and does not provides a valid listenable future'
@@ -175,9 +172,9 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
and: 'the subscription id is removed from the event cache map returning the asynchronous blocking variable'
0 * mockForwardedSubscriptionEventCache.remove("SCO-9989752cm-subscription-001") >> {block.set(_)}
and: 'the persistence service save target cm handles of the yang model subscription event as rejected '
- 1 * mockSubscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEventWithRejectedCmHandles)
+ 1 * mockSubscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent)
and: 'subscription outcome has been sent'
- 1 * mockSubscriptionEventResponseOutcome.sendResponse('SCO-9989752', 'cm-subscription-001')
+ 1 * mockSubscriptionEventResponseOutcome.sendResponse(emptySubscriptionEventResponse, 'subscriptionCreatedStatus')
}
static def createYangModelCmHandleWithDmiProperty(id, dmiId,propertyName, propertyValue) {
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumerSpec.groovy
index 5355dd8b9a..7cc40cc90e 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumerSpec.groovy
@@ -22,17 +22,27 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription
import com.fasterxml.jackson.databind.ObjectMapper
import com.hazelcast.map.IMap
+import io.cloudevents.CloudEvent
+import io.cloudevents.core.builder.CloudEventBuilder
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistenceImpl
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
-import org.onap.cps.ncmp.api.models.SubscriptionEventResponse
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse
+import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.spi.model.DataNodeBuilder
import org.onap.cps.utils.JsonObjectMapper
+import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
class SubscriptionEventResponseConsumerSpec extends MessagingBaseSpec {
+ @Autowired
+ JsonObjectMapper jsonObjectMapper
+
+ @Autowired
+ ObjectMapper objectMapper
+
IMap<String, Set<String>> mockForwardedSubscriptionEventCache = Mock(IMap<String, Set<String>>)
def mockSubscriptionPersistence = Mock(SubscriptionPersistenceImpl)
def mockSubscriptionEventResponseMapper = Mock(SubscriptionEventResponseMapper)
@@ -41,72 +51,90 @@ class SubscriptionEventResponseConsumerSpec extends MessagingBaseSpec {
def objectUnderTest = new SubscriptionEventResponseConsumer(mockForwardedSubscriptionEventCache,
mockSubscriptionPersistence, mockSubscriptionEventResponseMapper, mockSubscriptionEventResponseOutcome)
- def cmHandleToStatusMap = [CMHandle1: 'PENDING', CMHandle2: 'ACCEPTED'] as Map
- def testEventReceived = new SubscriptionEventResponse(clientId: 'some-client-id',
- subscriptionName: 'some-subscription-name', dmiName: 'some-dmi-name', cmHandleIdToStatus: cmHandleToStatusMap)
- def consumerRecord = new ConsumerRecord<String, SubscriptionEventResponse>('topic-name', 0, 0, 'event-key', testEventReceived)
-
def 'Consume Subscription Event Response where all DMIs have responded'() {
- given: 'a subscription event response and notifications are enabled'
- objectUnderTest.notificationFeatureEnabled = isNotificationFeatureEnabled
+ given: 'a consumer record including cloud event having subscription response'
+ def consumerRecordWithCloudEventAndSubscriptionResponse = getConsumerRecord()
+ and: 'a subscription response event'
+ def subscriptionResponseEvent = getSubscriptionResponseEvent()
+ and: 'a subscription event response and notifications are enabled'
+ objectUnderTest.notificationFeatureEnabled = notificationEnabled
and: 'subscription model loader is enabled'
- objectUnderTest.subscriptionModelLoaderEnabled = true
- and: 'a data node exist in db'
- def leaves1 = [status:'ACCEPTED', cmHandleId:'cmhandle1'] as Map
- def dataNode = new DataNodeBuilder().withDataspace('NCMP-Admin')
- .withAnchor('AVC-Subscriptions').withXpath('/subscription-registry/subscription')
- .withLeaves(leaves1).build()
- and: 'subscription persistence service returns data node'
- mockSubscriptionPersistence.getCmHandlesForSubscriptionEvent(*_) >> [dataNode]
+ objectUnderTest.subscriptionModelLoaderEnabled = modelLoaderEnabled
+ and: 'subscription persistence service returns data node includes no pending cm handle'
+ mockSubscriptionPersistence.getCmHandlesForSubscriptionEvent(*_) >> [getDataNode()]
when: 'the valid event is consumed'
- objectUnderTest.consumeSubscriptionEventResponse(consumerRecord)
+ objectUnderTest.consumeSubscriptionEventResponse(consumerRecordWithCloudEventAndSubscriptionResponse)
then: 'the forwarded subscription event cache returns only the received dmiName existing for the subscription create event'
- 1 * mockForwardedSubscriptionEventCache.containsKey('some-client-idsome-subscription-name') >> true
- 1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> (['some-dmi-name'] as Set)
+ 1 * mockForwardedSubscriptionEventCache.containsKey('SCO-9989752cm-subscription-001') >> true
+ 1 * mockForwardedSubscriptionEventCache.get('SCO-9989752cm-subscription-001') >> (['some-dmi-name'] as Set)
and: 'the forwarded subscription event cache returns an empty Map when the dmiName has been removed'
- 1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> ([] as Set)
+ 1 * mockForwardedSubscriptionEventCache.get('SCO-9989752cm-subscription-001') >> ([] as Set)
+ and: 'the response event is map to yang model'
+ numberOfTimeToPersist * mockSubscriptionEventResponseMapper.toYangModelSubscriptionEvent(_)
+ and: 'the response event is persisted into the db'
+ numberOfTimeToPersist * mockSubscriptionPersistence.saveSubscriptionEvent(_)
and: 'the subscription event is removed from the map'
- numberOfExpectedCallToRemove * mockForwardedSubscriptionEventCache.remove('some-client-idsome-subscription-name')
+ numberOfTimeToRemove * mockForwardedSubscriptionEventCache.remove('SCO-9989752cm-subscription-001')
and: 'a response outcome has been created'
- numberOfExpectedCallToSendResponse * mockSubscriptionEventResponseOutcome.sendResponse('some-client-id', 'some-subscription-name')
+ numberOfTimeToResponse * mockSubscriptionEventResponseOutcome.sendResponse(subscriptionResponseEvent, 'subscriptionCreated')
where: 'the following values are used'
- scenario | isNotificationFeatureEnabled || numberOfExpectedCallToRemove || numberOfExpectedCallToSendResponse
- 'Response sent' | true || 1 || 1
- 'Response not sent' | false || 0 || 0
+ scenario | modelLoaderEnabled | notificationEnabled || numberOfTimeToPersist || numberOfTimeToRemove || numberOfTimeToResponse
+ 'Both model loader and notification are enabled' | true | true || 1 || 1 || 1
+ 'Both model loader and notification are disabled' | false | false || 0 || 0 || 0
+ 'Model loader enabled and notification disabled' | true | false || 1 || 0 || 0
+ 'Model loader disabled and notification enabled' | false | true || 0 || 1 || 1
}
def 'Consume Subscription Event Response where another DMI has not yet responded'() {
given: 'a subscription event response and notifications are enabled'
- objectUnderTest.notificationFeatureEnabled = true
+ objectUnderTest.notificationFeatureEnabled = notificationEnabled
and: 'subscription model loader is enabled'
- objectUnderTest.subscriptionModelLoaderEnabled = true
+ objectUnderTest.subscriptionModelLoaderEnabled = modelLoaderEnabled
when: 'the valid event is consumed'
- objectUnderTest.consumeSubscriptionEventResponse(consumerRecord)
+ objectUnderTest.consumeSubscriptionEventResponse(getConsumerRecord())
then: 'the forwarded subscription event cache returns only the received dmiName existing for the subscription create event'
- 1 * mockForwardedSubscriptionEventCache.containsKey('some-client-idsome-subscription-name') >> true
- 1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> (['some-dmi-name', 'non-responded-dmi'] as Set)
+ 1 * mockForwardedSubscriptionEventCache.containsKey('SCO-9989752cm-subscription-001') >> true
+ 1 * mockForwardedSubscriptionEventCache.get('SCO-9989752cm-subscription-001') >> (['responded-dmi', 'non-responded-dmi'] as Set)
and: 'the forwarded subscription event cache returns an empty Map when the dmiName has been removed'
- 1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> (['non-responded-dmi'] as Set)
+ 1 * mockForwardedSubscriptionEventCache.get('SCO-9989752cm-subscription-001') >> (['non-responded-dmi'] as Set)
+ and: 'the response event is map to yang model'
+ numberOfTimeToPersist * mockSubscriptionEventResponseMapper.toYangModelSubscriptionEvent(_)
+ and: 'the response event is persisted into the db'
+ numberOfTimeToPersist * mockSubscriptionPersistence.saveSubscriptionEvent(_)
+ and: 'the subscription event is removed from the map'
and: 'the subscription event is not removed from the map'
0 * mockForwardedSubscriptionEventCache.remove(_)
and: 'a response outcome has not been created'
0 * mockSubscriptionEventResponseOutcome.sendResponse(*_)
+ where: 'the following values are used'
+ scenario | modelLoaderEnabled | notificationEnabled || numberOfTimeToPersist
+ 'Both model loader and notification are enabled' | true | true || 1
+ 'Both model loader and notification are disabled' | false | false || 0
+ 'Model loader enabled and notification disabled' | true | false || 1
+ 'Model loader disabled and notification enabled' | false | true || 0
}
- def 'Update subscription event when the model loader flag is enabled'() {
- given: 'subscription model loader is enabled as per #scenario'
- objectUnderTest.subscriptionModelLoaderEnabled = isSubscriptionModelLoaderEnabled
- when: 'the valid event is consumed'
- objectUnderTest.consumeSubscriptionEventResponse(consumerRecord)
- then: 'the forwarded subscription event cache does not return dmiName for the subscription create event'
- 1 * mockForwardedSubscriptionEventCache.containsKey('some-client-idsome-subscription-name') >> false
- and: 'the mapper returns yang model subscription event with #numberOfExpectedCall'
- numberOfExpectedCall * mockSubscriptionEventResponseMapper.toYangModelSubscriptionEvent(_)
- and: 'subscription event has been updated into DB with #numberOfExpectedCall'
- numberOfExpectedCall * mockSubscriptionPersistence.saveSubscriptionEvent(_)
- where: 'the following values are used'
- scenario | isSubscriptionModelLoaderEnabled || numberOfExpectedCall
- 'The event is updated' | true || 1
- 'The event is not updated' | false || 0
+ def getSubscriptionResponseEvent() {
+ def subscriptionResponseJsonData = TestUtils.getResourceFileContent('avcSubscriptionEventResponse.json')
+ return jsonObjectMapper.convertJsonString(subscriptionResponseJsonData, SubscriptionEventResponse.class)
+ }
+
+ def getCloudEventHavingSubscriptionResponseEvent() {
+ return CloudEventBuilder.v1()
+ .withData(objectMapper.writeValueAsBytes(getSubscriptionResponseEvent()))
+ .withId('some-id')
+ .withType('subscriptionCreated')
+ .withSource(URI.create('NCMP')).build()
+ }
+
+ def getConsumerRecord() {
+ return new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', getCloudEventHavingSubscriptionResponseEvent())
+ }
+
+ def getDataNode() {
+ def leaves = [status:'ACCEPTED', cmHandleId:'cmhandle1'] as Map
+ return new DataNodeBuilder().withDataspace('NCMP-Admin')
+ .withAnchor('AVC-Subscriptions').withXpath('/subscription-registry/subscription')
+ .withLeaves(leaves).build()
}
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapperSpec.groovy
index 00412aa933..4c60281f85 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapperSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapperSpec.groovy
@@ -22,9 +22,8 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription
import com.fasterxml.jackson.databind.ObjectMapper
import org.mapstruct.factory.Mappers
-import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventResponseMapper
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus
-import org.onap.cps.ncmp.api.models.SubscriptionEventResponse
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.utils.JsonObjectMapper
import org.springframework.beans.factory.annotation.Autowired
@@ -50,13 +49,12 @@ class SubscriptionEventResponseMapperSpec extends Specification {
assert result.clientId == "SCO-9989752"
and: 'subscription name'
assert result.subscriptionName == "cm-subscription-001"
- and: 'predicate targets '
- assert result.predicates.targetCmHandles.cmHandleId == ["CMHandle1", "CMHandle3", "CMHandle4", "CMHandle5"]
+ and: 'predicate targets cm handle size as expected'
+ assert result.predicates.targetCmHandles.size() == 7
+ and: 'predicate targets cm handle ids as expected'
+ assert result.predicates.targetCmHandles.cmHandleId == ["CMHandle1", "CMHandle2", "CMHandle3", "CMHandle4", "CMHandle5", "CMHandle6", "CMHandle7"]
and: 'the status for these targets is set to expected values'
- assert result.predicates.targetCmHandles.status == [SubscriptionStatus.ACCEPTED, SubscriptionStatus.REJECTED,
- SubscriptionStatus.PENDING, SubscriptionStatus.PENDING]
- and: 'the topic is null'
- assert result.topic == null
+ assert result.predicates.targetCmHandles.status == [SubscriptionStatus.REJECTED, SubscriptionStatus.REJECTED, SubscriptionStatus.REJECTED, SubscriptionStatus.REJECTED, SubscriptionStatus.PENDING, SubscriptionStatus.PENDING, SubscriptionStatus.PENDING]
}
} \ No newline at end of file
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcomeSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcomeSpec.groovy
index bb0e7b73a0..c1c428b13f 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcomeSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcomeSpec.groovy
@@ -21,13 +21,16 @@
package org.onap.cps.ncmp.api.impl.events.avcsubscription
import com.fasterxml.jackson.databind.ObjectMapper
-import org.apache.kafka.common.header.internals.RecordHeaders
+import io.cloudevents.CloudEvent
+import io.cloudevents.core.builder.CloudEventBuilder
import org.mapstruct.factory.Mappers
+import org.onap.cps.ncmp.api.NcmpEventResponseCode
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence
-import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus
import org.onap.cps.ncmp.api.impl.utils.DataNodeBaseSpec
-import org.onap.cps.ncmp.events.avc.subscription.v1.SubscriptionEventOutcome
+import org.onap.cps.ncmp.api.impl.utils.SubscriptionOutcomeCloudMapper
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.SubscriptionEventOutcome;
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.utils.JsonObjectMapper
import org.spockframework.spring.SpringBean
@@ -43,72 +46,77 @@ class SubscriptionEventResponseOutcomeSpec extends DataNodeBaseSpec {
@SpringBean
SubscriptionPersistence mockSubscriptionPersistence = Mock(SubscriptionPersistence)
@SpringBean
- EventsPublisher<SubscriptionEventOutcome> mockSubscriptionEventOutcomePublisher = Mock(EventsPublisher<SubscriptionEventOutcome>)
+ EventsPublisher<CloudEvent> mockSubscriptionEventOutcomePublisher = Mock(EventsPublisher<CloudEvent>)
@SpringBean
SubscriptionOutcomeMapper subscriptionOutcomeMapper = Mappers.getMapper(SubscriptionOutcomeMapper)
@Autowired
JsonObjectMapper jsonObjectMapper
+ @Autowired
+ ObjectMapper objectMapper
+
def 'Send response to the client apps successfully'() {
- given: 'a subscription client id and subscription name'
- def clientId = 'some-client-id'
- def subscriptionName = 'some-subscription-name'
- and: 'the persistence service return a data node'
+ given: 'a subscription response event'
+ def subscriptionResponseJsonData = TestUtils.getResourceFileContent('avcSubscriptionEventResponse.json')
+ def subscriptionResponseEvent = jsonObjectMapper.convertJsonString(subscriptionResponseJsonData, SubscriptionEventResponse.class)
+ and: 'a subscription outcome event'
+ def subscriptionOutcomeJsonData = TestUtils.getResourceFileContent('avcSubscriptionOutcomeEvent2.json')
+ def subscriptionOutcomeEvent = jsonObjectMapper.convertJsonString(subscriptionOutcomeJsonData, SubscriptionEventOutcome.class)
+ and: 'a random id for the cloud event'
+ SubscriptionOutcomeCloudMapper.randomId = 'some-id'
+ and: 'a cloud event containing the outcome event'
+ def testCloudEventSent = CloudEventBuilder.v1()
+ .withData(objectMapper.writeValueAsBytes(subscriptionOutcomeEvent))
+ .withId('some-id')
+ .withType('subscriptionCreatedStatus')
+ .withDataSchema(URI.create('urn:cps:' + 'org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.SubscriptionEventOutcome' + ':1.0.0'))
+ .withExtension("correlationid", 'SCO-9989752cm-subscription-001')
+ .withSource(URI.create('NCMP')).build()
+ and: 'the persistence service return a data node that includes pending cm handles that makes it partial success'
mockSubscriptionPersistence.getCmHandlesForSubscriptionEvent(*_) >> [dataNode4]
- and: 'the response is being generated from the db'
- def eventOutcome = objectUnderTest.generateResponse(clientId, subscriptionName)
when: 'the response is being sent'
- objectUnderTest.sendResponse(clientId, subscriptionName)
- then: 'the publisher publish the response with expected parameters'
- 1 * mockSubscriptionEventOutcomePublisher.publishEvent('cm-avc-subscription-response', clientId + subscriptionName, new RecordHeaders(), eventOutcome)
+ objectUnderTest.sendResponse(subscriptionResponseEvent, 'subscriptionCreatedStatus')
+ then: 'the publisher publish the cloud event with itself and expected parameters'
+ 1 * mockSubscriptionEventOutcomePublisher.publishCloudEvent('subscription-response', 'SCO-9989752cm-subscription-001', testCloudEventSent)
+ }
+
+ def 'Create subscription outcome message as expected'() {
+ given: 'a subscription response event'
+ def subscriptionResponseJsonData = TestUtils.getResourceFileContent('avcSubscriptionEventResponse.json')
+ def subscriptionResponseEvent = jsonObjectMapper.convertJsonString(subscriptionResponseJsonData, SubscriptionEventResponse.class)
+ and: 'a subscription outcome event'
+ def subscriptionOutcomeJsonData = TestUtils.getResourceFileContent('avcSubscriptionOutcomeEvent.json')
+ def subscriptionOutcomeEvent = jsonObjectMapper.convertJsonString(subscriptionOutcomeJsonData, SubscriptionEventOutcome.class)
+ and: 'a status code and status message a per #scenarios'
+ subscriptionOutcomeEvent.getData().setStatusCode(statusCode)
+ subscriptionOutcomeEvent.getData().setStatusMessage(statusMessage)
+ when: 'a subscription event outcome message is being formed'
+ def result = objectUnderTest.fromSubscriptionEventResponse(subscriptionResponseEvent, ncmpEventResponseCode)
+ then: 'the result will be equal to event outcome'
+ result == subscriptionOutcomeEvent
+ where: 'the following values are used'
+ scenario | ncmpEventResponseCode || statusMessage || statusCode
+ 'is full outcome' | NcmpEventResponseCode.SUCCESSFULLY_APPLIED_SUBSCRIPTION || 'successfully applied subscription' || 1
+ 'is partial outcome' | NcmpEventResponseCode.PARTIALLY_APPLIED_SUBSCRIPTION || 'partially applied subscription' || 104
}
def 'Check cm handle id to status map to see if it is a full outcome response'() {
when: 'is full outcome response evaluated'
- def response = objectUnderTest.isFullOutcomeResponse(cmHandleIdToStatusMap)
+ def response = objectUnderTest.decideOnNcmpEventResponseCodeForSubscription(cmHandleIdToStatusAndDetailsAsMap)
then: 'the result will be as expected'
- response == expectedResult
+ response == expectedOutcomeResponseDecision
where: 'the following values are used'
- scenario | cmHandleIdToStatusMap || expectedResult
- 'The map contains PENDING status' | ['CMHandle1': SubscriptionStatus.PENDING] as Map || false
- 'The map contains ACCEPTED status' | ['CMHandle1': SubscriptionStatus.ACCEPTED] as Map || true
- 'The map contains REJECTED status' | ['CMHandle1': SubscriptionStatus.REJECTED] as Map || true
- 'The map contains PENDING and ACCEPTED statuses' | ['CMHandle1': SubscriptionStatus.PENDING,'CMHandle2': SubscriptionStatus.ACCEPTED] as Map || false
- 'The map contains REJECTED and ACCEPTED statuses' | ['CMHandle1': SubscriptionStatus.REJECTED,'CMHandle2': SubscriptionStatus.ACCEPTED] as Map || true
- 'The map contains PENDING and REJECTED statuses' | ['CMHandle1': SubscriptionStatus.PENDING,'CMHandle2': SubscriptionStatus.REJECTED] as Map || false
+ scenario | cmHandleIdToStatusAndDetailsAsMap || expectedOutcomeResponseDecision
+ 'The map contains PENDING status' | [CMHandle1: [details:'Subscription forwarded to dmi plugin',status:'PENDING'] as Map] as Map || NcmpEventResponseCode.SUBSCRIPTION_PENDING
+ 'The map contains ACCEPTED status' | [CMHandle1: [details:'',status:'ACCEPTED'] as Map] as Map || NcmpEventResponseCode.SUCCESSFULLY_APPLIED_SUBSCRIPTION
+ 'The map contains REJECTED status' | [CMHandle1: [details:'Cm handle does not exist',status:'REJECTED'] as Map] as Map || NcmpEventResponseCode.SUBSCRIPTION_NOT_APPLICABLE
+ 'The map contains PENDING and PENDING statuses' | [CMHandle1: [details:'Some details',status:'PENDING'] as Map, CMHandle2: [details:'Some details',status:'PENDING'] as Map as Map] as Map || NcmpEventResponseCode.SUBSCRIPTION_PENDING
+ 'The map contains ACCEPTED and ACCEPTED statuses' | [CMHandle1: [details:'',status:'ACCEPTED'] as Map, CMHandle2: [details:'',status:'ACCEPTED'] as Map as Map] as Map || NcmpEventResponseCode.SUCCESSFULLY_APPLIED_SUBSCRIPTION
+ 'The map contains REJECTED and REJECTED statuses' | [CMHandle1: [details:'Reject details',status:'REJECTED'] as Map, CMHandle2: [details:'Reject details',status:'REJECTED'] as Map as Map] as Map || NcmpEventResponseCode.SUBSCRIPTION_NOT_APPLICABLE
+ 'The map contains PENDING and ACCEPTED statuses' | [CMHandle1: [details:'Some details',status:'PENDING'] as Map, CMHandle2: [details:'',status:'ACCEPTED'] as Map as Map] as Map || NcmpEventResponseCode.PARTIALLY_APPLIED_SUBSCRIPTION
+ 'The map contains REJECTED and ACCEPTED statuses' | [CMHandle1: [details:'Cm handle does not exist',status:'REJECTED'] as Map, CMHandle2: [details:'',status:'ACCEPTED'] as Map as Map] as Map || NcmpEventResponseCode.PARTIALLY_APPLIED_SUBSCRIPTION
+ 'The map contains PENDING and REJECTED statuses' | [CMHandle1: [details:'Subscription forwarded to dmi plugin',status:'PENDING'] as Map, CMHandle2: [details:'Cm handle does not exist',status:'REJECTED'] as Map as Map] as Map || NcmpEventResponseCode.PARTIALLY_APPLIED_SUBSCRIPTION
}
- def 'Generate response via fetching data nodes from database.'() {
- given: 'a db call to get data nodes for subscription event'
- 1 * mockSubscriptionPersistence.getCmHandlesForSubscriptionEvent(*_) >> [dataNode4]
- when: 'a response is generated'
- def result = objectUnderTest.generateResponse('some-client-id', 'some-subscription-name')
- then: 'the result will have the same values as same as in dataNode4'
- result.eventType == SubscriptionEventOutcome.EventType.PARTIAL_OUTCOME
- result.getEvent().getSubscription().getClientID() == 'some-client-id'
- result.getEvent().getSubscription().getName() == 'some-subscription-name'
- result.getEvent().getPredicates().getPendingTargets() == ['CMHandle3']
- result.getEvent().getPredicates().getRejectedTargets() == ['CMHandle1']
- result.getEvent().getPredicates().getAcceptedTargets() == ['CMHandle2']
- }
-
- def 'Form subscription outcome message with a list of cm handle id to status mapping'() {
- given: 'a list of collection including cm handle id to status'
- def cmHandleIdToStatus = [['PENDING', 'CMHandle5'], ['PENDING', 'CMHandle4'], ['ACCEPTED', 'CMHandle1'], ['REJECTED', 'CMHandle3']]
- and: 'an outcome event'
- def jsonData = TestUtils.getResourceFileContent('avcSubscriptionOutcomeEvent.json')
- def eventOutcome = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEventOutcome.class)
- eventOutcome.setEventType(expectedEventType)
- when: 'a subscription outcome message formed'
- def result = objectUnderTest.formSubscriptionOutcomeMessage(cmHandleIdToStatus, 'SCO-9989752',
- 'cm-subscription-001', isFullOutcomeResponse)
- result.getEvent().getPredicates().getPendingTargets().sort()
- then: 'the result will be equal to event outcome'
- result == eventOutcome
- where: 'the following values are used'
- scenario | isFullOutcomeResponse || expectedEventType
- 'is full outcome' | true || SubscriptionEventOutcome.EventType.COMPLETE_OUTCOME
- 'is partial outcome' | false || SubscriptionEventOutcome.EventType.PARTIAL_OUTCOME
- }
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapperSpec.groovy
index 7f1a628291..f5fbdfcb56 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapperSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapperSpec.groovy
@@ -22,9 +22,10 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription
import com.fasterxml.jackson.databind.ObjectMapper
import org.mapstruct.factory.Mappers
-import org.onap.cps.ncmp.api.models.SubscriptionEventResponse
-import org.onap.cps.ncmp.events.avc.subscription.v1.SubscriptionEventOutcome
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus
import org.onap.cps.ncmp.utils.TestUtils
+import org.onap.cps.spi.exceptions.DataValidationException
import org.onap.cps.utils.JsonObjectMapper
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
@@ -43,19 +44,44 @@ class SubscriptionOutcomeMapperSpec extends Specification {
given: 'a Subscription Response Event'
def subscriptionResponseJsonData = TestUtils.getResourceFileContent('avcSubscriptionEventResponse.json')
def subscriptionResponseEvent = jsonObjectMapper.convertJsonString(subscriptionResponseJsonData, SubscriptionEventResponse.class)
- and: 'a Subscription Outcome Event'
- def jsonDataOutcome = TestUtils.getResourceFileContent('avcSubscriptionOutcomeEvent.json')
- def expectedEventOutcome = jsonObjectMapper.convertJsonString(jsonDataOutcome, SubscriptionEventOutcome.class)
- expectedEventOutcome.setEventType(expectedEventType)
when: 'the subscription response event is mapped to a subscription event outcome'
def result = objectUnderTest.toSubscriptionEventOutcome(subscriptionResponseEvent)
- result.setEventType(expectedEventType)
- then: 'the resulting subscription event outcome contains the correct clientId'
- assert result == expectedEventOutcome
+ then: 'the resulting subscription event outcome contains expected pending targets per details grouping'
+ def pendingCmHandleTargetsPerDetails = result.getData().getAdditionalInfo().getPending()
+ assert pendingCmHandleTargetsPerDetails.get(0).getDetails() == 'EMS or node connectivity issues, retrying'
+ assert pendingCmHandleTargetsPerDetails.get(0).getTargets() == ['CMHandle5', 'CMHandle6','CMHandle7']
+ and: 'the resulting subscription event outcome contains expected rejected targets per details grouping'
+ def rejectedCmHandleTargetsPerDetails = result.getData().getAdditionalInfo().getRejected()
+ assert rejectedCmHandleTargetsPerDetails.get(0).getDetails() == 'Target(s) do not exist'
+ assert rejectedCmHandleTargetsPerDetails.get(0).getTargets() == ['CMHandle4']
+ assert rejectedCmHandleTargetsPerDetails.get(1).getDetails() == 'Faulty subscription format for target(s)'
+ assert rejectedCmHandleTargetsPerDetails.get(1).getTargets() == ['CMHandle1', 'CMHandle2','CMHandle3']
+ }
+
+ def 'Map subscription event response with null of subscription status list to subscription event outcome causes an exception'() {
+ given: 'a Subscription Response Event'
+ def subscriptionResponseJsonData = TestUtils.getResourceFileContent('avcSubscriptionEventResponse.json')
+ def subscriptionResponseEvent = jsonObjectMapper.convertJsonString(subscriptionResponseJsonData, SubscriptionEventResponse.class)
+ and: 'set subscription status list to null'
+ subscriptionResponseEvent.getData().setSubscriptionStatus(subscriptionStatusList)
+ when: 'the subscription response event is mapped to a subscription event outcome'
+ objectUnderTest.toSubscriptionEventOutcome(subscriptionResponseEvent)
+ then: 'a DataValidationException is thrown with an expected exception details'
+ def exception = thrown(DataValidationException)
+ exception.details == 'SubscriptionStatus list cannot be null or empty'
where: 'the following values are used'
- scenario || expectedEventType
- 'is full outcome' || SubscriptionEventOutcome.EventType.COMPLETE_OUTCOME
- 'is partial outcome' || SubscriptionEventOutcome.EventType.PARTIAL_OUTCOME
+ scenario || subscriptionStatusList
+ 'A null subscription status list' || null
+ 'An empty subscription status list' || new ArrayList<SubscriptionStatus>()
}
+ def 'Map subscription event response with subscription status list to subscription event outcome without any exception'() {
+ given: 'a Subscription Response Event'
+ def subscriptionResponseJsonData = TestUtils.getResourceFileContent('avcSubscriptionEventResponse.json')
+ def subscriptionResponseEvent = jsonObjectMapper.convertJsonString(subscriptionResponseJsonData, SubscriptionEventResponse.class)
+ when: 'the subscription response event is mapped to a subscription event outcome'
+ objectUnderTest.toSubscriptionEventOutcome(subscriptionResponseEvent)
+ then: 'no exception thrown'
+ noExceptionThrown()
+ }
} \ No newline at end of file
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceSpec.groovy
index ec54e8917a..7116a17862 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceSpec.groovy
@@ -59,14 +59,15 @@ class SubscriptionPersistenceSpec extends Specification {
SUBSCRIPTION_REGISTRY_PARENT,
'{"subscription":[{' +
'"topic":"some-topic",' +
- '"predicates":{"datastore":"some-datastore","targetCmHandles":[{"cmHandleId":"cmhandle1","status":"PENDING"},{"cmHandleId":"cmhandle2","status":"PENDING"}]},' +
+ '"predicates":{"datastore":"some-datastore","targetCmHandles":[{"cmHandleId":"cmhandle1","status":"PENDING","details":"Subscription forwarded to dmi plugin"},' +
+ '{"cmHandleId":"cmhandle2","status":"PENDING","details":"Subscription forwarded to dmi plugin"}]},' +
'"clientID":"some-client-id","subscriptionName":"some-subscription-name","isTagged":true}]}',
NO_TIMESTAMP)
}
def 'add or replace cm handle list element into db' () {
given: 'a data node with child node exist in db'
- def leaves1 = [status:'PENDING', cmHandleId:'cmhandle1'] as Map
+ def leaves1 = [status:'REJECTED', cmHandleId:'cmhandle1', details:'Cm handle does not exist'] as Map
def childDataNode = new DataNodeBuilder().withDataspace('NCMP-Admin')
.withAnchor('AVC-Subscriptions').withXpath('/subscription-registry/subscription')
.withLeaves(leaves1).build()
@@ -81,11 +82,11 @@ class SubscriptionPersistenceSpec extends Specification {
objectUnderTest.saveSubscriptionEvent(yangModelSubscriptionEvent)
then: 'the cpsDataService save non-existing cm handle with the correct data'
1 * mockCpsDataService.saveListElements(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
- SUBSCRIPTION_REGISTRY_PREDICATES_XPATH, '{"targetCmHandles":[{"cmHandleId":"cmhandle2","status":"PENDING"}]}',
+ SUBSCRIPTION_REGISTRY_PREDICATES_XPATH, '{"targetCmHandles":[{"cmHandleId":"cmhandle2","status":"PENDING","details":"Subscription forwarded to dmi plugin"}]}',
NO_TIMESTAMP)
and: 'the cpsDataService update existing cm handle with the correct data'
1 * mockCpsDataService.updateNodeLeaves(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
- SUBSCRIPTION_REGISTRY_PREDICATES_XPATH, '{"targetCmHandles":[{"cmHandleId":"cmhandle1","status":"PENDING"}]}',
+ SUBSCRIPTION_REGISTRY_PREDICATES_XPATH, '{"targetCmHandles":[{"cmHandleId":"cmhandle1","status":"PENDING","details":"Subscription forwarded to dmi plugin"}]}',
NO_TIMESTAMP)
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeBaseSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeBaseSpec.groovy
index 7474166ffe..e28a10261e 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeBaseSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeBaseSpec.groovy
@@ -25,13 +25,13 @@ import spock.lang.Specification
class DataNodeBaseSpec extends Specification {
- def leaves1 = [status:'PENDING', cmHandleId:'CMHandle3'] as Map
+ def leaves1 = [status:'PENDING', cmHandleId:'CMHandle3', details:'Subscription forwarded to dmi plugin'] as Map
def dataNode1 = createDataNodeWithLeaves(leaves1)
- def leaves2 = [status:'ACCEPTED', cmHandleId:'CMHandle2'] as Map
+ def leaves2 = [status:'ACCEPTED', cmHandleId:'CMHandle2', details:''] as Map
def dataNode2 = createDataNodeWithLeaves(leaves2)
- def leaves3 = [status:'REJECTED', cmHandleId:'CMHandle1'] as Map
+ def leaves3 = [status:'REJECTED', cmHandleId:'CMHandle1', details:'Cm handle does not exist'] as Map
def dataNode3 = createDataNodeWithLeaves(leaves3)
def leaves4 = [datastore:'passthrough-running'] as Map
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeHelperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeHelperSpec.groovy
index 819f1fa08e..28db7babf9 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeHelperSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeHelperSpec.groovy
@@ -20,7 +20,6 @@
package org.onap.cps.ncmp.api.impl.utils
-import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus
import org.onap.cps.spi.model.DataNodeBuilder
class DataNodeHelperSpec extends DataNodeBaseSpec {
@@ -38,9 +37,9 @@ class DataNodeHelperSpec extends DataNodeBaseSpec {
and: 'all the leaves result list are equal to given leaves of data nodes'
result[0] == [clientID:'SCO-9989752', isTagged:false, subscriptionName:'cm-subscription-001']
result[1] == [datastore:'passthrough-running']
- result[2] == [status:'PENDING', cmHandleId:'CMHandle3']
- result[3] == [status:'ACCEPTED', cmHandleId:'CMHandle2']
- result[4] == [status:'REJECTED', cmHandleId:'CMHandle1']
+ result[2] == [status:'PENDING', cmHandleId:'CMHandle3', details:'Subscription forwarded to dmi plugin']
+ result[3] == [status:'ACCEPTED', cmHandleId:'CMHandle2', details:'']
+ result[4] == [status:'REJECTED', cmHandleId:'CMHandle1', details:'Cm handle does not exist']
}
def 'Get cm handle id to status as expected from a nested data node.'() {
@@ -52,26 +51,18 @@ class DataNodeHelperSpec extends DataNodeBaseSpec {
and: 'the nested data node is flatten and retrieves the leaves '
def leaves = DataNodeHelper.getDataNodeLeaves([dataNode])
when:'cm handle id to status is retrieved'
- def result = DataNodeHelper.getCmHandleIdToStatus(leaves)
+ def result = DataNodeHelper.cmHandleIdToStatusAndDetailsAsMap(leaves)
then: 'the result list size is 3'
result.size() == 3
and: 'the result contains expected values'
- result[0] as List == ['PENDING', 'CMHandle3']
- result[1] as List == ['ACCEPTED', 'CMHandle2']
- result[2] as List == ['REJECTED', 'CMHandle1']
- }
+ result == [
+ CMHandle3: [details:'Subscription forwarded to dmi plugin',status:'PENDING'] as Map,
+ CMHandle2: [details:'',status:'ACCEPTED'] as Map,
+ CMHandle1: [details:'Cm handle does not exist',status:'REJECTED'] as Map
+ ] as Map
- def 'Get cm handle id to status map as expected from list of collection' () {
- given: 'a list of collection'
- def cmHandleCollection = [['PENDING', 'CMHandle3'], ['ACCEPTED', 'CMHandle2'], ['REJECTED', 'CMHandle1']]
- when: 'the map is formed up with a method call'
- def result = DataNodeHelper.getCmHandleIdToStatusMap(cmHandleCollection)
- then: 'the map values are as expected'
- result.keySet() == ['CMHandle3', 'CMHandle2', 'CMHandle1'] as Set
- result.values() as List == [SubscriptionStatus.PENDING, SubscriptionStatus.ACCEPTED, SubscriptionStatus.REJECTED]
}
-
def 'Get cm handle id to status map as expected from a nested data node.'() {
given: 'a nested data node'
def dataNode = new DataNodeBuilder().withDataspace('NCMP-Admin')
@@ -79,8 +70,14 @@ class DataNodeHelperSpec extends DataNodeBaseSpec {
.withLeaves([clientID:'SCO-9989752', isTagged:false, subscriptionName:'cm-subscription-001'])
.withChildDataNodes([dataNode4]).build()
when:'cm handle id to status is being extracted'
- def result = DataNodeHelper.getCmHandleIdToStatusMapFromDataNodes([dataNode]);
- then: 'the keys are retrieved as expected'
- result.keySet() == ['CMHandle3','CMHandle2','CMHandle1'] as Set
+ def result = DataNodeHelper.cmHandleIdToStatusAndDetailsAsMapFromDataNode([dataNode]);
+ then: 'the result list size is 3'
+ result.size() == 3
+ and: 'the result contains expected values'
+ result == [
+ CMHandle3: [details:'Subscription forwarded to dmi plugin',status:'PENDING'] as Map,
+ CMHandle2: [details:'',status:'ACCEPTED'] as Map,
+ CMHandle1: [details:'Cm handle does not exist',status:'REJECTED'] as Map
+ ] as Map
}
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapperSpec.groovy
index 61eb319101..bc19e2dde8 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapperSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapperSpec.groovy
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.core.builder.CloudEventBuilder
import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent
import org.onap.cps.ncmp.utils.TestUtils
+import org.onap.cps.spi.exceptions.CloudEventConstructionException
import org.onap.cps.utils.JsonObjectMapper
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
@@ -45,7 +46,7 @@ class SubscriptionEventCloudMapperSpec extends Specification {
def testCloudEvent = CloudEventBuilder.v1()
.withData(objectMapper.writeValueAsBytes(testEventData))
.withId('some-event-id')
- .withType('CREATE')
+ .withType('subscriptionCreated')
.withSource(URI.create('some-resource'))
.withExtension('correlationid', 'test-cmhandle1').build()
when: 'the cloud event map to subscription event'
@@ -59,7 +60,7 @@ class SubscriptionEventCloudMapperSpec extends Specification {
def testCloudEvent = CloudEventBuilder.v1()
.withData(null)
.withId('some-event-id')
- .withType('CREATE')
+ .withType('subscriptionCreated')
.withSource(URI.create('some-resource'))
.withExtension('correlationid', 'test-cmhandle1').build()
when: 'the cloud event map to subscription event'
@@ -75,30 +76,29 @@ class SubscriptionEventCloudMapperSpec extends Specification {
org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent.class)
def testCloudEvent = CloudEventBuilder.v1()
.withData(objectMapper.writeValueAsBytes(testEventData))
- .withId('some-event-key')
- .withType('CREATE')
- .withSource(URI.create('some-resource'))
+ .withId('some-id')
+ .withType('subscriptionCreated')
+ .withSource(URI.create('SCO-9989752'))
.withExtension('correlationid', 'test-cmhandle1').build()
when: 'the subscription event map to data of cloud event'
- def resultCloudEvent = SubscriptionEventCloudMapper.toCloudEvent(testEventData, 'some-event-key')
+ SubscriptionEventCloudMapper.randomId = 'some-id'
+ def resultCloudEvent = SubscriptionEventCloudMapper.toCloudEvent(testEventData, 'some-event-key', 'subscriptionCreated')
then: 'the subscription event resulted having expected values'
resultCloudEvent.getData() == testCloudEvent.getData()
resultCloudEvent.getId() == testCloudEvent.getId()
resultCloudEvent.getType() == testCloudEvent.getType()
+ resultCloudEvent.getSource() == URI.create('SCO-9989752')
+ resultCloudEvent.getDataSchema() == URI.create('urn:cps:org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent:1.0.0')
}
def 'Map the subscription event to data of the cloud event with wrong content causes an exception'() {
given: 'an empty ncmp subscription event'
def testNcmpSubscriptionEvent = new org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent()
when: 'the subscription event map to data of cloud event'
- def thrownException = null
- try {
- SubscriptionEventCloudMapper.toCloudEvent(testNcmpSubscriptionEvent, 'some-key')
- } catch (Exception e) {
- thrownException = e
- }
+ SubscriptionEventCloudMapper.toCloudEvent(testNcmpSubscriptionEvent, 'some-key', 'some-event-type')
then: 'a run time exception is thrown'
- assert thrownException instanceof RuntimeException
+ def exception = thrown(CloudEventConstructionException)
+ exception.details == 'Invalid object to serialize or required headers is missing'
}
}
diff --git a/cps-ncmp-service/src/test/resources/application.yml b/cps-ncmp-service/src/test/resources/application.yml
index edbd7022f2..7442670920 100644
--- a/cps-ncmp-service/src/test/resources/application.yml
+++ b/cps-ncmp-service/src/test/resources/application.yml
@@ -30,7 +30,7 @@ app:
async-m2m:
topic: ncmp-async-m2m
avc:
- subscription-topic: cm-avc-subscription
+ subscription-topic: subscription
cm-events-topic: cm-events
subscription-forward-topic-prefix: ${NCMP_FORWARD_CM_AVC_SUBSCRIPTION:ncmp-dmi-cm-avc-subscription-}
diff --git a/cps-ncmp-service/src/test/resources/avcSubscriptionEventResponse.json b/cps-ncmp-service/src/test/resources/avcSubscriptionEventResponse.json
index 3244f05a03..52ca1df62b 100644
--- a/cps-ncmp-service/src/test/resources/avcSubscriptionEventResponse.json
+++ b/cps-ncmp-service/src/test/resources/avcSubscriptionEventResponse.json
@@ -1,11 +1,44 @@
{
- "clientId": "SCO-9989752",
- "subscriptionName": "cm-subscription-001",
- "dmiName": "ncmp-dmi-plugin",
- "cmHandleIdToStatus": {
- "CMHandle1": "ACCEPTED",
- "CMHandle3": "REJECTED",
- "CMHandle4": "PENDING",
- "CMHandle5": "PENDING"
+ "data": {
+ "clientId": "SCO-9989752",
+ "subscriptionName": "cm-subscription-001",
+ "dmiName": "dminame1",
+ "subscriptionStatus": [
+ {
+ "id": "CMHandle1",
+ "status": "REJECTED",
+ "details": "Faulty subscription format for target(s)"
+ },
+ {
+ "id": "CMHandle2",
+ "status": "REJECTED",
+ "details": "Faulty subscription format for target(s)"
+ },
+ {
+ "id": "CMHandle3",
+ "status": "REJECTED",
+ "details": "Faulty subscription format for target(s)"
+ },
+ {
+ "id": "CMHandle4",
+ "status": "REJECTED",
+ "details": "Target(s) do not exist"
+ },
+ {
+ "id": "CMHandle5",
+ "status": "PENDING",
+ "details": "EMS or node connectivity issues, retrying"
+ },
+ {
+ "id": "CMHandle6",
+ "status": "PENDING",
+ "details": "EMS or node connectivity issues, retrying"
+ },
+ {
+ "id": "CMHandle7",
+ "status": "PENDING",
+ "details": "EMS or node connectivity issues, retrying"
+ }
+ ]
}
} \ No newline at end of file
diff --git a/cps-ncmp-service/src/test/resources/avcSubscriptionOutcomeEvent.json b/cps-ncmp-service/src/test/resources/avcSubscriptionOutcomeEvent.json
index 6bfa36bf79..2d83bdddcb 100644
--- a/cps-ncmp-service/src/test/resources/avcSubscriptionOutcomeEvent.json
+++ b/cps-ncmp-service/src/test/resources/avcSubscriptionOutcomeEvent.json
@@ -1,20 +1,23 @@
{
- "eventType": "PARTIAL_OUTCOME",
- "event": {
- "subscription": {
- "clientID": "SCO-9989752",
- "name": "cm-subscription-001"
- },
- "predicates": {
- "rejectedTargets": [
- "CMHandle3"
+ "data": {
+ "statusCode": 104,
+ "statusMessage": "partially applied subscription",
+ "additionalInfo": {
+ "rejected": [
+ {
+ "details": "Target(s) do not exist",
+ "targets": ["CMHandle4"]
+ },
+ {
+ "details": "Faulty subscription format for target(s)",
+ "targets": ["CMHandle1", "CMHandle2", "CMHandle3"]
+ }
],
- "acceptedTargets": [
- "CMHandle1"
- ],
- "pendingTargets": [
- "CMHandle4",
- "CMHandle5"
+ "pending": [
+ {
+ "details": "EMS or node connectivity issues, retrying",
+ "targets": ["CMHandle5", "CMHandle6", "CMHandle7"]
+ }
]
}
}
diff --git a/cps-ncmp-service/src/test/resources/avcSubscriptionOutcomeEvent2.json b/cps-ncmp-service/src/test/resources/avcSubscriptionOutcomeEvent2.json
new file mode 100644
index 0000000000..35ff0241df
--- /dev/null
+++ b/cps-ncmp-service/src/test/resources/avcSubscriptionOutcomeEvent2.json
@@ -0,0 +1,20 @@
+{
+ "data": {
+ "statusCode": 104,
+ "statusMessage": "partially applied subscription",
+ "additionalInfo": {
+ "rejected": [
+ {
+ "details": "Cm handle does not exist",
+ "targets": ["CMHandle1"]
+ }
+ ],
+ "pending": [
+ {
+ "details": "Subscription forwarded to dmi plugin",
+ "targets": ["CMHandle3"]
+ }
+ ]
+ }
+ }
+} \ No newline at end of file