summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java39
1 files changed, 24 insertions, 15 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
index 1fe963a27..f196cb01e 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
@@ -44,6 +44,8 @@ import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.Data;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.CmHandle;
import org.onap.cps.spi.exceptions.OperationNotYetSupportedException;
import org.springframework.beans.factory.annotation.Value;
@@ -74,7 +76,7 @@ public class SubscriptionEventForwarder {
*
* @param subscriptionEvent the event to be forwarded
*/
- public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent) {
+ public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent, final String eventType) {
final List<String> cmHandleTargets = subscriptionEvent.getData().getPredicates().getTargets();
if (cmHandleTargets == null || cmHandleTargets.isEmpty()
|| cmHandleTargets.stream().anyMatch(id -> (id).contains("*"))) {
@@ -85,13 +87,19 @@ public class SubscriptionEventForwarder {
inventoryPersistence.getYangModelCmHandles(cmHandleTargets);
final Map<String, Map<String, Map<String, String>>> dmiPropertiesPerCmHandleIdPerServiceName
= DmiServiceNameOrganizer.getDmiPropertiesPerCmHandleIdPerServiceName(yangModelCmHandles);
- findDmisAndRespond(subscriptionEvent, cmHandleTargets, dmiPropertiesPerCmHandleIdPerServiceName);
+ findDmisAndRespond(subscriptionEvent, eventType, cmHandleTargets, dmiPropertiesPerCmHandleIdPerServiceName);
}
- private void findDmisAndRespond(final SubscriptionEvent subscriptionEvent,
+ private void findDmisAndRespond(final SubscriptionEvent subscriptionEvent, final String eventType,
final List<String> cmHandleTargetsAsStrings,
final Map<String, Map<String, Map<String, String>>>
dmiPropertiesPerCmHandleIdPerServiceName) {
+ final SubscriptionEventResponse emptySubscriptionEventResponse =
+ new SubscriptionEventResponse().withData(new Data());
+ emptySubscriptionEventResponse.getData().setSubscriptionName(
+ subscriptionEvent.getData().getSubscription().getName());
+ emptySubscriptionEventResponse.getData().setClientId(
+ subscriptionEvent.getData().getSubscription().getClientID());
final List<String> cmHandlesThatExistsInDb = dmiPropertiesPerCmHandleIdPerServiceName.entrySet().stream()
.map(Map.Entry::getValue).map(Map::keySet).flatMap(Set::stream).collect(Collectors.toList());
@@ -104,27 +112,27 @@ public class SubscriptionEventForwarder {
updatesCmHandlesToRejectedAndPersistSubscriptionEvent(subscriptionEvent, targetCmHandlesDoesNotExistInDb);
}
if (dmisToRespond.isEmpty()) {
- final String clientID = subscriptionEvent.getData().getSubscription().getClientID();
- final String subscriptionName = subscriptionEvent.getData().getSubscription().getName();
- subscriptionEventResponseOutcome.sendResponse(clientID, subscriptionName);
+ subscriptionEventResponseOutcome.sendResponse(emptySubscriptionEventResponse,
+ "subscriptionCreatedStatus");
} else {
- startResponseTimeout(subscriptionEvent, dmisToRespond);
+ startResponseTimeout(emptySubscriptionEventResponse, dmisToRespond);
final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent ncmpSubscriptionEvent =
clientSubscriptionEventMapper.toNcmpSubscriptionEvent(subscriptionEvent);
- forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, ncmpSubscriptionEvent);
+ forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, ncmpSubscriptionEvent, eventType);
}
}
- private void startResponseTimeout(final SubscriptionEvent subscriptionEvent, final Set<String> dmisToRespond) {
- final String subscriptionClientId = subscriptionEvent.getData().getSubscription().getClientID();
- final String subscriptionName = subscriptionEvent.getData().getSubscription().getName();
+ private void startResponseTimeout(final SubscriptionEventResponse emptySubscriptionEventResponse,
+ final Set<String> dmisToRespond) {
+ final String subscriptionClientId = emptySubscriptionEventResponse.getData().getClientId();
+ final String subscriptionName = emptySubscriptionEventResponse.getData().getSubscriptionName();
final String subscriptionEventId = subscriptionClientId + subscriptionName;
forwardedSubscriptionEventCache.put(subscriptionEventId, dmisToRespond,
ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS);
final ResponseTimeoutTask responseTimeoutTask =
new ResponseTimeoutTask(forwardedSubscriptionEventCache, subscriptionEventResponseOutcome,
- subscriptionClientId, subscriptionName);
+ emptySubscriptionEventResponse);
try {
executorService.schedule(responseTimeoutTask, dmiResponseTimeoutInMs, TimeUnit.MILLISECONDS);
} catch (final RuntimeException ex) {
@@ -135,7 +143,7 @@ public class SubscriptionEventForwarder {
private void forwardEventToDmis(final Map<String, Map<String, Map<String, String>>> dmiNameCmHandleMap,
final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent
- ncmpSubscriptionEvent) {
+ ncmpSubscriptionEvent, final String eventType) {
dmiNameCmHandleMap.forEach((dmiName, cmHandlePropertiesMap) -> {
final List<CmHandle> cmHandleTargets = cmHandlePropertiesMap.entrySet().stream().map(
cmHandleAndProperties -> {
@@ -150,7 +158,7 @@ public class SubscriptionEventForwarder {
final String dmiAvcSubscriptionTopic = dmiAvcSubscriptionTopicPrefix + dmiName;
final CloudEvent ncmpSubscriptionCloudEvent =
- SubscriptionEventCloudMapper.toCloudEvent(ncmpSubscriptionEvent, eventKey);
+ SubscriptionEventCloudMapper.toCloudEvent(ncmpSubscriptionEvent, eventKey, eventType);
eventsPublisher.publishCloudEvent(dmiAvcSubscriptionTopic, eventKey, ncmpSubscriptionCloudEvent);
});
}
@@ -182,6 +190,7 @@ public class SubscriptionEventForwarder {
return yangModelSubscriptionEvent.getPredicates().getTargetCmHandles().stream()
.filter(targetCmHandle -> targetCmHandlesDoesNotExistInDb.contains(targetCmHandle.getCmHandleId()))
.map(target -> new YangModelSubscriptionEvent.TargetCmHandle(target.getCmHandleId(),
- SubscriptionStatus.REJECTED)).collect(Collectors.toList());
+ SubscriptionStatus.REJECTED, "Targets not found"))
+ .collect(Collectors.toList());
}
}