aboutsummaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java46
1 files changed, 45 insertions, 1 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
index 9e363f3cd..1d87a057a 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
@@ -21,6 +21,7 @@
package org.onap.cps.ncmp.api.impl.events.avcsubscription;
import com.hazelcast.map.IMap;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -37,8 +38,11 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.header.Headers;
import org.onap.cps.ncmp.api.impl.config.embeddedcache.ForwardedSubscriptionEventCacheConfig;
import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
+import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence;
+import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
import org.onap.cps.ncmp.api.impl.utils.DmiServiceNameOrganizer;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
+import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
import org.onap.cps.ncmp.event.model.SubscriptionEvent;
import org.onap.cps.spi.exceptions.OperationNotYetSupportedException;
@@ -55,6 +59,8 @@ public class SubscriptionEventForwarder {
private final EventsPublisher<SubscriptionEvent> eventsPublisher;
private final IMap<String, Set<String>> forwardedSubscriptionEventCache;
private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome;
+ private final SubscriptionEventMapper subscriptionEventMapper;
+ private final SubscriptionPersistence subscriptionPersistence;
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
@Value("${app.ncmp.avc.subscription-forward-topic-prefix}")
private String dmiAvcSubscriptionTopicPrefix;
@@ -83,11 +89,29 @@ public class SubscriptionEventForwarder {
final Map<String, Map<String, Map<String, String>>> dmiPropertiesPerCmHandleIdPerServiceName
= DmiServiceNameOrganizer.getDmiPropertiesPerCmHandleIdPerServiceName(yangModelCmHandles);
+ findDmisAndRespond(subscriptionEvent, eventHeaders, cmHandleTargetsAsStrings,
+ dmiPropertiesPerCmHandleIdPerServiceName);
+ }
+
+ private void findDmisAndRespond(final SubscriptionEvent subscriptionEvent, final Headers eventHeaders,
+ final List<String> cmHandleTargetsAsStrings,
+ final Map<String, Map<String, Map<String, String>>>
+ dmiPropertiesPerCmHandleIdPerServiceName) {
+ final List<String> cmHandlesThatExistsInDb = dmiPropertiesPerCmHandleIdPerServiceName.entrySet().stream()
+ .map(Map.Entry::getValue).map(Map::keySet).flatMap(Set::stream).collect(Collectors.toList());
+
+ final List<String> targetCmHandlesDoesNotExistInDb = new ArrayList<>(cmHandleTargetsAsStrings);
+ targetCmHandlesDoesNotExistInDb.removeAll(cmHandlesThatExistsInDb);
+
final Set<String> dmisToRespond = new HashSet<>(dmiPropertiesPerCmHandleIdPerServiceName.keySet());
+
+ if (dmisToRespond.isEmpty() || !targetCmHandlesDoesNotExistInDb.isEmpty()) {
+ updatesCmHandlesToRejectedAndPersistSubscriptionEvent(subscriptionEvent, targetCmHandlesDoesNotExistInDb);
+ }
if (dmisToRespond.isEmpty()) {
final String clientID = subscriptionEvent.getEvent().getSubscription().getClientID();
final String subscriptionName = subscriptionEvent.getEvent().getSubscription().getName();
- subscriptionEventResponseOutcome.sendResponse(clientID, subscriptionName, true);
+ subscriptionEventResponseOutcome.sendResponse(clientID, subscriptionName);
} else {
startResponseTimeout(subscriptionEvent, dmisToRespond);
forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, subscriptionEvent, eventHeaders);
@@ -130,4 +154,24 @@ public class SubscriptionEventForwarder {
+ "-"
+ dmiName;
}
+
+ private void updatesCmHandlesToRejectedAndPersistSubscriptionEvent(
+ final SubscriptionEvent subscriptionEvent,
+ final List<String> targetCmHandlesDoesNotExistInDb) {
+ final YangModelSubscriptionEvent yangModelSubscriptionEvent =
+ subscriptionEventMapper.toYangModelSubscriptionEvent(subscriptionEvent);
+ yangModelSubscriptionEvent.getPredicates()
+ .setTargetCmHandles(findRejectedCmHandles(targetCmHandlesDoesNotExistInDb,
+ yangModelSubscriptionEvent));
+ subscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent);
+ }
+
+ private static List<YangModelSubscriptionEvent.TargetCmHandle> findRejectedCmHandles(
+ final List<String> targetCmHandlesDoesNotExistInDb,
+ final YangModelSubscriptionEvent yangModelSubscriptionEvent) {
+ return yangModelSubscriptionEvent.getPredicates().getTargetCmHandles().stream()
+ .filter(targetCmHandle -> targetCmHandlesDoesNotExistInDb.contains(targetCmHandle.getCmHandleId()))
+ .map(target -> new YangModelSubscriptionEvent.TargetCmHandle(target.getCmHandleId(),
+ SubscriptionStatus.REJECTED)).collect(Collectors.toList());
+ }
}