diff options
3 files changed, 40 insertions, 16 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java index c2c71dbaae..1cdc7ed3e0 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java @@ -154,7 +154,7 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler { dmiSubscriptionDetails.getDmiCmSubscriptionPredicates()); if (dmiCmSubscriptionPredicates.isEmpty()) { - acceptAndPublishNcmpOutEventPerDmi(subscriptionId, dmiPluginName); + acceptAndPersistCmSubscriptionPerDmi(subscriptionId, dmiPluginName); } else { publishDmiInEventPerDmi(subscriptionId, dmiPluginName, dmiCmSubscriptionPredicates); } @@ -168,7 +168,7 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler { "subscriptionCreateRequest", dmiInEvent); } - private void acceptAndPublishNcmpOutEventPerDmi(final String subscriptionId, final String dmiPluginName) { + private void acceptAndPersistCmSubscriptionPerDmi(final String subscriptionId, final String dmiPluginName) { dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiPluginName, CmSubscriptionStatus.ACCEPTED); dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducer.java index 01d720937f..3371d59f7a 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducer.java @@ -57,7 +57,8 @@ public class NcmpOutEventProducer { private final NcmpOutEventMapper ncmpOutEventMapper; private final DmiCacheHandler dmiCacheHandler; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); - private static final Map<String, ScheduledFuture<?>> scheduledTasksPerSubscriptionId = new ConcurrentHashMap<>(); + private static final Map<String, ScheduledFuture<?>> scheduledTasksPerSubscriptionIdAndEventType = + new ConcurrentHashMap<>(); /** * Publish the event to the client who requested the subscription with key as subscription id and event is Cloud @@ -73,14 +74,20 @@ public class NcmpOutEventProducer { public void publishNcmpOutEvent(final String subscriptionId, final String eventType, final NcmpOutEvent ncmpOutEvent, final boolean isScheduledEvent) { - if (isScheduledEvent && !scheduledTasksPerSubscriptionId.containsKey(subscriptionId)) { + final String taskKey = subscriptionId.concat(eventType); + + if (isScheduledEvent && !scheduledTasksPerSubscriptionIdAndEventType.containsKey(taskKey)) { final ScheduledFuture<?> scheduledFuture = scheduleAndPublishNcmpOutEvent(subscriptionId, eventType); - scheduledTasksPerSubscriptionId.putIfAbsent(subscriptionId, scheduledFuture); - log.debug("Scheduled the CmNotificationSubscriptionEvent for subscriptionId : {}", subscriptionId); + scheduledTasksPerSubscriptionIdAndEventType.putIfAbsent(taskKey, scheduledFuture); + log.debug("Scheduled the Cm Subscription Event for subscriptionId : {} and eventType : {}", subscriptionId, + eventType); } else { - cancelScheduledTaskForSubscriptionId(subscriptionId); - publishNcmpOutEventNow(subscriptionId, eventType, ncmpOutEvent); - log.info("Published CmNotificationSubscriptionEvent on demand for subscriptionId : {}", subscriptionId); + cancelScheduledTask(taskKey); + if (ncmpOutEvent != null) { + publishNcmpOutEventNow(subscriptionId, eventType, ncmpOutEvent); + log.debug("Published Cm Subscription Event on demand for subscriptionId : {} and eventType : {}", + subscriptionId, eventType); + } } } @@ -92,12 +99,12 @@ public class NcmpOutEventProducer { TimeUnit.MILLISECONDS); } - private void cancelScheduledTaskForSubscriptionId(final String subscriptionId) { + private void cancelScheduledTask(final String taskKey) { - final ScheduledFuture<?> scheduledFuture = scheduledTasksPerSubscriptionId.get(subscriptionId); + final ScheduledFuture<?> scheduledFuture = scheduledTasksPerSubscriptionIdAndEventType.get(taskKey); if (scheduledFuture != null) { scheduledFuture.cancel(true); - scheduledTasksPerSubscriptionId.remove(subscriptionId); + scheduledTasksPerSubscriptionIdAndEventType.remove(taskKey); } } @@ -106,10 +113,8 @@ public class NcmpOutEventProducer { private void publishNcmpOutEventNow(final String subscriptionId, final String eventType, final NcmpOutEvent ncmpOutEvent) { final CloudEvent ncmpOutEventAsCloudEvent = - buildAndGetNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType, - ncmpOutEvent); - eventsPublisher.publishCloudEvent(ncmpOutEventTopic, subscriptionId, - ncmpOutEventAsCloudEvent); + buildAndGetNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType, ncmpOutEvent); + eventsPublisher.publishCloudEvent(ncmpOutEventTopic, subscriptionId, ncmpOutEventAsCloudEvent); dmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId); } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducerSpec.groovy index e03682d8c9..afa2e9874e 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducerSpec.groovy @@ -83,5 +83,24 @@ class NcmpOutEventProducerSpec extends Specification { 1 * mockDmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId) } + def 'No event published when NCMP out event is null'() { + given: 'a cm subscription response for the client' + def subscriptionId = 'test-subscription-id-3' + def eventType = 'subscriptionCreateResponse' + def ncmpOutEvent = null + and: 'also we have target topic for publishing to client' + objectUnderTest.ncmpOutEventTopic = 'client-test-topic' + and: 'a deadline to an event' + objectUnderTest.dmiOutEventTimeoutInMs = 1000 + when: 'the event is scheduled to be published' + objectUnderTest.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, true) + then: 'we wait for 10ms and then we receive response from DMI' + Thread.sleep(10) + and: 'we receive NO response from DMI so we publish the message on demand' + objectUnderTest.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false) + and: 'no event published' + 0 * mockEventsPublisher.publishCloudEvent(*_) + } + } |