summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java9
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java9
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java39
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapper.java4
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java35
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapper.java29
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java132
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapper.java109
8 files changed, 216 insertions, 150 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java
index c178700ee..176e644ba 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java
@@ -24,6 +24,7 @@ import com.hazelcast.map.IMap;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
@Slf4j
@RequiredArgsConstructor
@@ -31,8 +32,7 @@ public class ResponseTimeoutTask implements Runnable {
private final IMap<String, Set<String>> forwardedSubscriptionEventCache;
private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome;
- private final String subscriptionClientId;
- private final String subscriptionName;
+ private final SubscriptionEventResponse subscriptionEventResponse;
@Override
public void run() {
@@ -47,9 +47,12 @@ public class ResponseTimeoutTask implements Runnable {
}
private void generateAndSendResponse() {
+ final String subscriptionClientId = subscriptionEventResponse.getData().getClientId();
+ final String subscriptionName = subscriptionEventResponse.getData().getSubscriptionName();
final String subscriptionEventId = subscriptionClientId + subscriptionName;
if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) {
- subscriptionEventResponseOutcome.sendResponse(subscriptionClientId, subscriptionName);
+ subscriptionEventResponseOutcome.sendResponse(subscriptionEventResponse,
+ "subscriptionCreatedStatus");
forwardedSubscriptionEventCache.remove(subscriptionEventId);
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java
index f511965c7..5afc52d7e 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java
@@ -58,22 +58,23 @@ public class SubscriptionEventConsumer {
containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
public void consumeSubscriptionEvent(final ConsumerRecord<String, CloudEvent> subscriptionEventConsumerRecord) {
final CloudEvent cloudEvent = subscriptionEventConsumerRecord.value();
+ final String eventType = subscriptionEventConsumerRecord.value().getType();
final SubscriptionEvent subscriptionEvent = SubscriptionEventCloudMapper.toSubscriptionEvent(cloudEvent);
final String eventDatastore = subscriptionEvent.getData().getPredicates().getDatastore();
- if (!(eventDatastore.equals("passthrough-running") || eventDatastore.equals("passthrough-operational"))) {
+ if (!eventDatastore.equals("passthrough-running")) {
throw new OperationNotYetSupportedException(
- "passthrough datastores are currently only supported for event subscriptions");
+ "passthrough-running datastores are currently only supported for event subscriptions");
}
if ("CM".equals(subscriptionEvent.getData().getDataType().getDataCategory())) {
if (subscriptionModelLoaderEnabled) {
persistSubscriptionEvent(subscriptionEvent);
}
- if ("CREATE".equals(cloudEvent.getType())) {
+ if ("subscriptionCreated".equals(cloudEvent.getType())) {
log.info("Subscription for ClientID {} with name {} ...",
subscriptionEvent.getData().getSubscription().getClientID(),
subscriptionEvent.getData().getSubscription().getName());
if (notificationFeatureEnabled) {
- subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent);
+ subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent, eventType);
}
}
} else {
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
index 1fe963a27..f196cb01e 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
@@ -44,6 +44,8 @@ import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.Data;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.CmHandle;
import org.onap.cps.spi.exceptions.OperationNotYetSupportedException;
import org.springframework.beans.factory.annotation.Value;
@@ -74,7 +76,7 @@ public class SubscriptionEventForwarder {
*
* @param subscriptionEvent the event to be forwarded
*/
- public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent) {
+ public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent, final String eventType) {
final List<String> cmHandleTargets = subscriptionEvent.getData().getPredicates().getTargets();
if (cmHandleTargets == null || cmHandleTargets.isEmpty()
|| cmHandleTargets.stream().anyMatch(id -> (id).contains("*"))) {
@@ -85,13 +87,19 @@ public class SubscriptionEventForwarder {
inventoryPersistence.getYangModelCmHandles(cmHandleTargets);
final Map<String, Map<String, Map<String, String>>> dmiPropertiesPerCmHandleIdPerServiceName
= DmiServiceNameOrganizer.getDmiPropertiesPerCmHandleIdPerServiceName(yangModelCmHandles);
- findDmisAndRespond(subscriptionEvent, cmHandleTargets, dmiPropertiesPerCmHandleIdPerServiceName);
+ findDmisAndRespond(subscriptionEvent, eventType, cmHandleTargets, dmiPropertiesPerCmHandleIdPerServiceName);
}
- private void findDmisAndRespond(final SubscriptionEvent subscriptionEvent,
+ private void findDmisAndRespond(final SubscriptionEvent subscriptionEvent, final String eventType,
final List<String> cmHandleTargetsAsStrings,
final Map<String, Map<String, Map<String, String>>>
dmiPropertiesPerCmHandleIdPerServiceName) {
+ final SubscriptionEventResponse emptySubscriptionEventResponse =
+ new SubscriptionEventResponse().withData(new Data());
+ emptySubscriptionEventResponse.getData().setSubscriptionName(
+ subscriptionEvent.getData().getSubscription().getName());
+ emptySubscriptionEventResponse.getData().setClientId(
+ subscriptionEvent.getData().getSubscription().getClientID());
final List<String> cmHandlesThatExistsInDb = dmiPropertiesPerCmHandleIdPerServiceName.entrySet().stream()
.map(Map.Entry::getValue).map(Map::keySet).flatMap(Set::stream).collect(Collectors.toList());
@@ -104,27 +112,27 @@ public class SubscriptionEventForwarder {
updatesCmHandlesToRejectedAndPersistSubscriptionEvent(subscriptionEvent, targetCmHandlesDoesNotExistInDb);
}
if (dmisToRespond.isEmpty()) {
- final String clientID = subscriptionEvent.getData().getSubscription().getClientID();
- final String subscriptionName = subscriptionEvent.getData().getSubscription().getName();
- subscriptionEventResponseOutcome.sendResponse(clientID, subscriptionName);
+ subscriptionEventResponseOutcome.sendResponse(emptySubscriptionEventResponse,
+ "subscriptionCreatedStatus");
} else {
- startResponseTimeout(subscriptionEvent, dmisToRespond);
+ startResponseTimeout(emptySubscriptionEventResponse, dmisToRespond);
final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent ncmpSubscriptionEvent =
clientSubscriptionEventMapper.toNcmpSubscriptionEvent(subscriptionEvent);
- forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, ncmpSubscriptionEvent);
+ forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, ncmpSubscriptionEvent, eventType);
}
}
- private void startResponseTimeout(final SubscriptionEvent subscriptionEvent, final Set<String> dmisToRespond) {
- final String subscriptionClientId = subscriptionEvent.getData().getSubscription().getClientID();
- final String subscriptionName = subscriptionEvent.getData().getSubscription().getName();
+ private void startResponseTimeout(final SubscriptionEventResponse emptySubscriptionEventResponse,
+ final Set<String> dmisToRespond) {
+ final String subscriptionClientId = emptySubscriptionEventResponse.getData().getClientId();
+ final String subscriptionName = emptySubscriptionEventResponse.getData().getSubscriptionName();
final String subscriptionEventId = subscriptionClientId + subscriptionName;
forwardedSubscriptionEventCache.put(subscriptionEventId, dmisToRespond,
ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS);
final ResponseTimeoutTask responseTimeoutTask =
new ResponseTimeoutTask(forwardedSubscriptionEventCache, subscriptionEventResponseOutcome,
- subscriptionClientId, subscriptionName);
+ emptySubscriptionEventResponse);
try {
executorService.schedule(responseTimeoutTask, dmiResponseTimeoutInMs, TimeUnit.MILLISECONDS);
} catch (final RuntimeException ex) {
@@ -135,7 +143,7 @@ public class SubscriptionEventForwarder {
private void forwardEventToDmis(final Map<String, Map<String, Map<String, String>>> dmiNameCmHandleMap,
final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent
- ncmpSubscriptionEvent) {
+ ncmpSubscriptionEvent, final String eventType) {
dmiNameCmHandleMap.forEach((dmiName, cmHandlePropertiesMap) -> {
final List<CmHandle> cmHandleTargets = cmHandlePropertiesMap.entrySet().stream().map(
cmHandleAndProperties -> {
@@ -150,7 +158,7 @@ public class SubscriptionEventForwarder {
final String dmiAvcSubscriptionTopic = dmiAvcSubscriptionTopicPrefix + dmiName;
final CloudEvent ncmpSubscriptionCloudEvent =
- SubscriptionEventCloudMapper.toCloudEvent(ncmpSubscriptionEvent, eventKey);
+ SubscriptionEventCloudMapper.toCloudEvent(ncmpSubscriptionEvent, eventKey, eventType);
eventsPublisher.publishCloudEvent(dmiAvcSubscriptionTopic, eventKey, ncmpSubscriptionCloudEvent);
});
}
@@ -182,6 +190,7 @@ public class SubscriptionEventForwarder {
return yangModelSubscriptionEvent.getPredicates().getTargetCmHandles().stream()
.filter(targetCmHandle -> targetCmHandlesDoesNotExistInDb.contains(targetCmHandle.getCmHandleId()))
.map(target -> new YangModelSubscriptionEvent.TargetCmHandle(target.getCmHandleId(),
- SubscriptionStatus.REJECTED)).collect(Collectors.toList());
+ SubscriptionStatus.REJECTED, "Targets not found"))
+ .collect(Collectors.toList());
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapper.java
index bf9ceb1c3..35d94cc7a 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapper.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapper.java
@@ -25,7 +25,6 @@ import java.util.stream.Collectors;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Named;
-import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent;
@@ -47,8 +46,7 @@ public interface SubscriptionEventMapper {
*/
@Named("mapTargetsToCmHandleTargets")
default List<YangModelSubscriptionEvent.TargetCmHandle> mapTargetsToCmHandleTargets(List<String> targets) {
- return targets.stream().map(target -> new YangModelSubscriptionEvent.TargetCmHandle(target,
- SubscriptionStatus.PENDING))
+ return targets.stream().map(YangModelSubscriptionEvent.TargetCmHandle::new)
.collect(Collectors.toList());
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java
index 20df706c0..ddb9fd6fc 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java
@@ -21,6 +21,7 @@
package org.onap.cps.ncmp.api.impl.events.avcsubscription;
import com.hazelcast.map.IMap;
+import io.cloudevents.CloudEvent;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
@@ -32,8 +33,9 @@ import org.onap.cps.ncmp.api.impl.config.embeddedcache.ForwardedSubscriptionEven
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence;
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
import org.onap.cps.ncmp.api.impl.utils.DataNodeHelper;
+import org.onap.cps.ncmp.api.impl.utils.SubscriptionEventResponseCloudMapper;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
-import org.onap.cps.ncmp.api.models.SubscriptionEventResponse;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
import org.onap.cps.spi.model.DataNode;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
@@ -61,19 +63,21 @@ public class SubscriptionEventResponseConsumer {
* @param subscriptionEventResponseConsumerRecord the event to be consumed
*/
@KafkaListener(topics = "${app.ncmp.avc.subscription-response-topic}",
- properties = {"spring.json.value.default.type=org.onap.cps.ncmp.api.models.SubscriptionEventResponse"})
+ containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
public void consumeSubscriptionEventResponse(
- final ConsumerRecord<String, SubscriptionEventResponse> subscriptionEventResponseConsumerRecord) {
- final SubscriptionEventResponse subscriptionEventResponse = subscriptionEventResponseConsumerRecord.value();
- final String clientId = subscriptionEventResponse.getClientId();
+ final ConsumerRecord<String, CloudEvent> subscriptionEventResponseConsumerRecord) {
+ final CloudEvent cloudEvent = subscriptionEventResponseConsumerRecord.value();
+ final String eventType = subscriptionEventResponseConsumerRecord.value().getType();
+ final SubscriptionEventResponse subscriptionEventResponse =
+ SubscriptionEventResponseCloudMapper.toSubscriptionEventResponse(cloudEvent);
+ final String clientId = subscriptionEventResponse.getData().getClientId();
log.info("subscription event response of clientId: {} is received.", clientId);
- final String subscriptionName = subscriptionEventResponse.getSubscriptionName();
+ final String subscriptionName = subscriptionEventResponse.getData().getSubscriptionName();
final String subscriptionEventId = clientId + subscriptionName;
boolean createOutcomeResponse = false;
if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) {
final Set<String> dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId);
-
- dmiNames.remove(subscriptionEventResponse.getDmiName());
+ dmiNames.remove(subscriptionEventResponse.getData().getDmiName());
forwardedSubscriptionEventCache.put(subscriptionEventId, dmiNames,
ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS);
createOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty();
@@ -84,7 +88,7 @@ public class SubscriptionEventResponseConsumer {
if (createOutcomeResponse
&& notificationFeatureEnabled
&& hasNoPendingCmHandles(clientId, subscriptionName)) {
- subscriptionEventResponseOutcome.sendResponse(clientId, subscriptionName);
+ subscriptionEventResponseOutcome.sendResponse(subscriptionEventResponse, eventType);
forwardedSubscriptionEventCache.remove(subscriptionEventId);
}
}
@@ -92,10 +96,15 @@ public class SubscriptionEventResponseConsumer {
private boolean hasNoPendingCmHandles(final String clientId, final String subscriptionName) {
final Collection<DataNode> dataNodeSubscription = subscriptionPersistence.getCmHandlesForSubscriptionEvent(
clientId, subscriptionName);
- final Map<String, SubscriptionStatus> cmHandleIdToStatusMap =
- DataNodeHelper.getCmHandleIdToStatusMapFromDataNodes(
- dataNodeSubscription);
- return !cmHandleIdToStatusMap.values().contains(SubscriptionStatus.PENDING);
+ final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMapOriginal =
+ DataNodeHelper.cmHandleIdToStatusAndDetailsAsMapFromDataNode(dataNodeSubscription);
+ for (final Map<String, String> statusAndDetailsMap : cmHandleIdToStatusAndDetailsAsMapOriginal.values()) {
+ final String status = statusAndDetailsMap.get("status");
+ if (SubscriptionStatus.PENDING.toString().equals(status)) {
+ return false;
+ }
+ }
+ return true;
}
private void updateSubscriptionEvent(final SubscriptionEventResponse subscriptionEventResponse) {
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapper.java
index 44181c57c..dc122ee5d 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapper.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapper.java
@@ -21,36 +21,35 @@
package org.onap.cps.ncmp.api.impl.events.avcsubscription;
import java.util.List;
-import java.util.Map;
import java.util.stream.Collectors;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Named;
-import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
-import org.onap.cps.ncmp.api.models.SubscriptionEventResponse;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus;
@Mapper(componentModel = "spring")
public interface SubscriptionEventResponseMapper {
- @Mapping(source = "clientId", target = "clientId")
- @Mapping(source = "subscriptionName", target = "subscriptionName")
- @Mapping(source = "cmHandleIdToStatus", target = "predicates.targetCmHandles",
- qualifiedByName = "mapStatusToCmHandleTargets")
+ @Mapping(source = "data.clientId", target = "clientId")
+ @Mapping(source = "data.subscriptionName", target = "subscriptionName")
+ @Mapping(source = "data.subscriptionStatus", target = "predicates.targetCmHandles",
+ qualifiedByName = "mapSubscriptionStatusToCmHandleTargets")
YangModelSubscriptionEvent toYangModelSubscriptionEvent(
SubscriptionEventResponse subscriptionEventResponse);
/**
- * Maps StatusToCMHandle to list of TargetCmHandle.
+ * Maps SubscriptionStatus to list of TargetCmHandle.
*
- * @param targets as a map
+ * @param subscriptionStatus as a list
* @return TargetCmHandle list
*/
- @Named("mapStatusToCmHandleTargets")
- default List<YangModelSubscriptionEvent.TargetCmHandle> mapStatusToCmHandleTargets(
- Map<String, SubscriptionStatus> targets) {
- return targets.entrySet().stream().map(target ->
- new YangModelSubscriptionEvent.TargetCmHandle(target.getKey(), target.getValue())).collect(
- Collectors.toList());
+ @Named("mapSubscriptionStatusToCmHandleTargets")
+ default List<YangModelSubscriptionEvent.TargetCmHandle> mapSubscriptionStatusToCmHandleTargets(
+ List<SubscriptionStatus> subscriptionStatus) {
+ return subscriptionStatus.stream().map(status -> new YangModelSubscriptionEvent.TargetCmHandle(status.getId(),
+ org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus.fromString(status.getStatus().value()),
+ status.getDetails())).collect(Collectors.toList());
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java
index 8fdff1794..9ed686529 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java
@@ -20,21 +20,20 @@
package org.onap.cps.ncmp.api.impl.events.avcsubscription;
-import java.io.Serializable;
-import java.util.Collection;
+import io.cloudevents.CloudEvent;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.onap.cps.ncmp.api.NcmpEventResponseCode;
import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence;
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
import org.onap.cps.ncmp.api.impl.utils.DataNodeHelper;
-import org.onap.cps.ncmp.api.models.SubscriptionEventResponse;
-import org.onap.cps.ncmp.events.avc.subscription.v1.SubscriptionEventOutcome;
-import org.onap.cps.spi.model.DataNode;
+import org.onap.cps.ncmp.api.impl.utils.SubscriptionOutcomeCloudMapper;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.SubscriptionEventOutcome;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@@ -45,75 +44,106 @@ public class SubscriptionEventResponseOutcome {
private final SubscriptionPersistence subscriptionPersistence;
- private final EventsPublisher<SubscriptionEventOutcome> outcomeEventsPublisher;
+ private final EventsPublisher<CloudEvent> outcomeEventsPublisher;
private final SubscriptionOutcomeMapper subscriptionOutcomeMapper;
- @Value("${app.ncmp.avc.subscription-outcome-topic:cm-avc-subscription-response}")
+ @Value("${app.ncmp.avc.subscription-outcome-topic:subscription-response}")
private String subscriptionOutcomeEventTopic;
/**
* This is for construction of outcome message to be published for client apps.
*
- * @param subscriptionClientId client id of the subscription.
- * @param subscriptionName name of the subscription.
+ * @param subscriptionEventResponse event produced by Dmi Plugin
*/
- public void sendResponse(final String subscriptionClientId, final String subscriptionName) {
- final SubscriptionEventOutcome subscriptionEventOutcome = generateResponse(
- subscriptionClientId, subscriptionName);
- final Headers headers = new RecordHeaders();
+ public void sendResponse(final SubscriptionEventResponse subscriptionEventResponse, final String eventKey) {
+ final SubscriptionEventOutcome subscriptionEventOutcome =
+ formSubscriptionOutcomeMessage(subscriptionEventResponse);
+ final String subscriptionClientId = subscriptionEventResponse.getData().getClientId();
+ final String subscriptionName = subscriptionEventResponse.getData().getSubscriptionName();
final String subscriptionEventId = subscriptionClientId + subscriptionName;
- outcomeEventsPublisher.publishEvent(subscriptionOutcomeEventTopic,
- subscriptionEventId, headers, subscriptionEventOutcome);
+ final CloudEvent subscriptionOutcomeCloudEvent =
+ SubscriptionOutcomeCloudMapper.toCloudEvent(subscriptionEventOutcome,
+ subscriptionEventId, eventKey);
+ outcomeEventsPublisher.publishCloudEvent(subscriptionOutcomeEventTopic,
+ subscriptionEventId, subscriptionOutcomeCloudEvent);
}
- private SubscriptionEventOutcome generateResponse(final String subscriptionClientId,
- final String subscriptionName) {
- final Collection<DataNode> dataNodes =
- subscriptionPersistence.getCmHandlesForSubscriptionEvent(subscriptionClientId, subscriptionName);
- final List<Map<String, Serializable>> dataNodeLeaves = DataNodeHelper.getDataNodeLeaves(dataNodes);
- final List<Collection<Serializable>> cmHandleIdToStatus =
- DataNodeHelper.getCmHandleIdToStatus(dataNodeLeaves);
- final Map<String, SubscriptionStatus> cmHandleIdToStatusMap =
- DataNodeHelper.getCmHandleIdToStatusMap(cmHandleIdToStatus);
- return formSubscriptionOutcomeMessage(cmHandleIdToStatus, subscriptionClientId, subscriptionName,
- isFullOutcomeResponse(cmHandleIdToStatusMap));
+ private SubscriptionEventOutcome formSubscriptionOutcomeMessage(
+ final SubscriptionEventResponse subscriptionEventResponse) {
+ final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMap =
+ DataNodeHelper.cmHandleIdToStatusAndDetailsAsMapFromDataNode(
+ subscriptionPersistence.getCmHandlesForSubscriptionEvent(
+ subscriptionEventResponse.getData().getClientId(),
+ subscriptionEventResponse.getData().getSubscriptionName()));
+ final List<org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus>
+ subscriptionStatusList = mapCmHandleIdStatusDetailsMapToSubscriptionStatusList(
+ cmHandleIdToStatusAndDetailsAsMap);
+ subscriptionEventResponse.getData().setSubscriptionStatus(subscriptionStatusList);
+ return fromSubscriptionEventResponse(subscriptionEventResponse,
+ decideOnNcmpEventResponseCodeForSubscription(cmHandleIdToStatusAndDetailsAsMap));
}
- private boolean isFullOutcomeResponse(final Map<String, SubscriptionStatus> cmHandleIdToStatusMap) {
- return !cmHandleIdToStatusMap.values().contains(SubscriptionStatus.PENDING);
+ private static List<org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus>
+ mapCmHandleIdStatusDetailsMapToSubscriptionStatusList(
+ final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMap) {
+ return cmHandleIdToStatusAndDetailsAsMap.entrySet()
+ .stream().map(entryset -> {
+ final org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus
+ subscriptionStatus = new org.onap.cps.ncmp.events.avcsubscription1_0_0
+ .dmi_to_ncmp.SubscriptionStatus();
+ final String cmHandleId = entryset.getKey();
+ final Map<String, String> statusAndDetailsMap = entryset.getValue();
+ final String status = statusAndDetailsMap.get("status");
+ final String details = statusAndDetailsMap.get("details");
+ subscriptionStatus.setId(cmHandleId);
+ subscriptionStatus.setStatus(
+ org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp
+ .SubscriptionStatus.Status.fromValue(status));
+ subscriptionStatus.setDetails(details);
+ return subscriptionStatus;
+ }).collect(Collectors.toList());
}
- private SubscriptionEventOutcome formSubscriptionOutcomeMessage(
- final List<Collection<Serializable>> cmHandleIdToStatus, final String subscriptionClientId,
- final String subscriptionName, final boolean isFullOutcomeResponse) {
+ private NcmpEventResponseCode decideOnNcmpEventResponseCodeForSubscription(
+ final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMap) {
- final SubscriptionEventResponse subscriptionEventResponse = toSubscriptionEventResponse(
- cmHandleIdToStatus, subscriptionClientId, subscriptionName);
+ final boolean isAllTargetsPending = isAllTargetCmHandleStatusMatch(cmHandleIdToStatusAndDetailsAsMap,
+ SubscriptionStatus.PENDING);
- final SubscriptionEventOutcome subscriptionEventOutcome =
- subscriptionOutcomeMapper.toSubscriptionEventOutcome(subscriptionEventResponse);
+ final boolean isAllTargetsRejected = isAllTargetCmHandleStatusMatch(cmHandleIdToStatusAndDetailsAsMap,
+ SubscriptionStatus.REJECTED);
+
+ final boolean isAllTargetsAccepted = isAllTargetCmHandleStatusMatch(cmHandleIdToStatusAndDetailsAsMap,
+ SubscriptionStatus.ACCEPTED);
- if (isFullOutcomeResponse) {
- subscriptionEventOutcome.setEventType(SubscriptionEventOutcome.EventType.COMPLETE_OUTCOME);
+ if (isAllTargetsAccepted) {
+ return NcmpEventResponseCode.SUCCESSFULLY_APPLIED_SUBSCRIPTION;
+ } else if (isAllTargetsRejected) {
+ return NcmpEventResponseCode.SUBSCRIPTION_NOT_APPLICABLE;
+ } else if (isAllTargetsPending) {
+ return NcmpEventResponseCode.SUBSCRIPTION_PENDING;
} else {
- subscriptionEventOutcome.setEventType(SubscriptionEventOutcome.EventType.PARTIAL_OUTCOME);
+ return NcmpEventResponseCode.PARTIALLY_APPLIED_SUBSCRIPTION;
}
+ }
- return subscriptionEventOutcome;
+ private boolean isAllTargetCmHandleStatusMatch(
+ final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMap,
+ final SubscriptionStatus subscriptionStatus) {
+ return cmHandleIdToStatusAndDetailsAsMap.values().stream()
+ .allMatch(entryset -> entryset.containsValue(subscriptionStatus.toString()));
}
- private SubscriptionEventResponse toSubscriptionEventResponse(
- final List<Collection<Serializable>> cmHandleIdToStatus, final String subscriptionClientId,
- final String subscriptionName) {
- final Map<String, SubscriptionStatus> cmHandleIdToStatusMap =
- DataNodeHelper.getCmHandleIdToStatusMap(cmHandleIdToStatus);
+ private SubscriptionEventOutcome fromSubscriptionEventResponse(
+ final SubscriptionEventResponse subscriptionEventResponse,
+ final NcmpEventResponseCode ncmpEventResponseCode) {
- final SubscriptionEventResponse subscriptionEventResponse = new SubscriptionEventResponse();
- subscriptionEventResponse.setClientId(subscriptionClientId);
- subscriptionEventResponse.setSubscriptionName(subscriptionName);
- subscriptionEventResponse.setCmHandleIdToStatus(cmHandleIdToStatusMap);
+ final SubscriptionEventOutcome subscriptionEventOutcome =
+ subscriptionOutcomeMapper.toSubscriptionEventOutcome(subscriptionEventResponse);
+ subscriptionEventOutcome.getData().setStatusCode(Integer.parseInt(ncmpEventResponseCode.getStatusCode()));
+ subscriptionEventOutcome.getData().setStatusMessage(ncmpEventResponseCode.getStatusMessage());
- return subscriptionEventResponse;
+ return subscriptionEventOutcome;
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapper.java
index cecde5f81..7803b982f 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapper.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapper.java
@@ -26,63 +26,80 @@ import java.util.stream.Collectors;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Named;
-import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
-import org.onap.cps.ncmp.api.models.SubscriptionEventResponse;
-import org.onap.cps.ncmp.events.avc.subscription.v1.SubscriptionEventOutcome;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.AdditionalInfo;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.AdditionalInfoDetail;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.SubscriptionEventOutcome;
+import org.onap.cps.spi.exceptions.DataValidationException;
@Mapper(componentModel = "spring")
public interface SubscriptionOutcomeMapper {
- @Mapping(source = "clientId", target = "event.subscription.clientID")
- @Mapping(source = "subscriptionName", target = "event.subscription.name")
- @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.rejectedTargets",
- qualifiedByName = "mapStatusToCmHandleRejected")
- @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.acceptedTargets",
- qualifiedByName = "mapStatusToCmHandleAccepted")
- @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.pendingTargets",
- qualifiedByName = "mapStatusToCmHandlePending")
- SubscriptionEventOutcome toSubscriptionEventOutcome(
- SubscriptionEventResponse subscriptionEventResponse);
+ @Mapping(source = "data.subscriptionStatus", target = "data.additionalInfo",
+ qualifiedByName = "mapListOfSubscriptionStatusToAdditionalInfo")
+ SubscriptionEventOutcome toSubscriptionEventOutcome(SubscriptionEventResponse subscriptionEventResponse);
/**
- * Maps StatusToCMHandle to list of TargetCmHandle rejected.
+ * Maps list of SubscriptionStatus to an AdditionalInfo.
*
- * @param targets as a map
- * @return TargetCmHandle list
+ * @param subscriptionStatusList containing details
+ * @return an AdditionalInfo
*/
- @Named("mapStatusToCmHandleRejected")
- default List<Object> mapStatusToCmHandleRejected(Map<String, SubscriptionStatus> targets) {
- return targets.entrySet()
- .stream().filter(target -> SubscriptionStatus.REJECTED.equals(target.getValue()))
- .map(Map.Entry::getKey)
- .collect(Collectors.toList());
+ @Named("mapListOfSubscriptionStatusToAdditionalInfo")
+ default AdditionalInfo mapListOfSubscriptionStatusToAdditionalInfo(
+ final List<SubscriptionStatus> subscriptionStatusList) {
+ if (subscriptionStatusList == null || subscriptionStatusList.isEmpty()) {
+ throw new DataValidationException("Invalid subscriptionStatusList",
+ "SubscriptionStatus list cannot be null or empty");
+ }
+
+ final Map<String, List<SubscriptionStatus>> rejectedSubscriptionsPerDetails = getSubscriptionsPerDetails(
+ subscriptionStatusList, SubscriptionStatus.Status.REJECTED);
+ final Map<String, List<String>> rejectedCmHandlesPerDetails =
+ getCmHandlesPerDetails(rejectedSubscriptionsPerDetails);
+ final List<AdditionalInfoDetail> rejectedCmHandles = getAdditionalInfoDetailList(rejectedCmHandlesPerDetails);
+
+
+ final Map<String, List<SubscriptionStatus>> pendingSubscriptionsPerDetails = getSubscriptionsPerDetails(
+ subscriptionStatusList, SubscriptionStatus.Status.PENDING);
+ final Map<String, List<String>> pendingCmHandlesPerDetails =
+ getCmHandlesPerDetails(pendingSubscriptionsPerDetails);
+ final List<AdditionalInfoDetail> pendingCmHandles = getAdditionalInfoDetailList(pendingCmHandlesPerDetails);
+
+ final AdditionalInfo additionalInfo = new AdditionalInfo();
+ additionalInfo.setRejected(rejectedCmHandles);
+ additionalInfo.setPending(pendingCmHandles);
+
+ return additionalInfo;
}
- /**
- * Maps StatusToCMHandle to list of TargetCmHandle accepted.
- *
- * @param targets as a map
- * @return TargetCmHandle list
- */
- @Named("mapStatusToCmHandleAccepted")
- default List<Object> mapStatusToCmHandleAccepted(Map<String, SubscriptionStatus> targets) {
- return targets.entrySet()
- .stream().filter(target -> SubscriptionStatus.ACCEPTED.equals(target.getValue()))
- .map(Map.Entry::getKey)
- .collect(Collectors.toList());
+ private static Map<String, List<SubscriptionStatus>> getSubscriptionsPerDetails(
+ final List<SubscriptionStatus> subscriptionStatusList, final SubscriptionStatus.Status status) {
+ return subscriptionStatusList.stream()
+ .filter(subscriptionStatus -> subscriptionStatus.getStatus() == status)
+ .collect(Collectors.groupingBy(SubscriptionStatus::getDetails));
}
- /**
- * Maps StatusToCMHandle to list of TargetCmHandle pending.
- *
- * @param targets as a map
- * @return TargetCmHandle list
- */
- @Named("mapStatusToCmHandlePending")
- default List<Object> mapStatusToCmHandlePending(Map<String, SubscriptionStatus> targets) {
- return targets.entrySet()
- .stream().filter(target -> SubscriptionStatus.PENDING.equals(target.getValue()))
- .map(Map.Entry::getKey)
- .collect(Collectors.toList());
+ private static Map<String, List<String>> getCmHandlesPerDetails(
+ final Map<String, List<SubscriptionStatus>> subscriptionsPerDetails) {
+ return subscriptionsPerDetails.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> entry.getValue().stream()
+ .map(SubscriptionStatus::getId)
+ .collect(Collectors.toList())
+ ));
+ }
+
+ private static List<AdditionalInfoDetail> getAdditionalInfoDetailList(
+ final Map<String, List<String>> cmHandlesPerDetails) {
+ return cmHandlesPerDetails.entrySet().stream()
+ .map(entry -> {
+ final AdditionalInfoDetail detail = new AdditionalInfoDetail();
+ detail.setDetails(entry.getKey());
+ detail.setTargets(entry.getValue());
+ return detail;
+ }).collect(Collectors.toList());
}
}