diff options
Diffstat (limited to 'cps-ncmp-service')
5 files changed, 95 insertions, 23 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducer.java index 76ee08e64c..e7af69e500 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducer.java @@ -34,7 +34,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.events.EventsPublisher; import org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper.CmNotificationSubscriptionNcmpOutEventMapper; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails; import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent; import org.onap.cps.utils.JsonObjectMapper; import org.springframework.beans.factory.annotation.Value; @@ -55,8 +54,8 @@ public class CmNotificationSubscriptionNcmpOutEventProducer { private final EventsPublisher<CloudEvent> eventsPublisher; private final JsonObjectMapper jsonObjectMapper; - private final Map<String, Map<String, DmiCmNotificationSubscriptionDetails>> cmNotificationSubscriptionCache; private final CmNotificationSubscriptionNcmpOutEventMapper cmNotificationSubscriptionNcmpOutEventMapper; + private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private static final Map<String, ScheduledFuture<?>> scheduledTasksPerSubscriptionId = new ConcurrentHashMap<>(); @@ -72,8 +71,9 @@ public class CmNotificationSubscriptionNcmpOutEventProducer { * or published now */ public void publishCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId, final String eventType, - final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent, - final boolean isScheduledEvent) { + final CmNotificationSubscriptionNcmpOutEvent + cmNotificationSubscriptionNcmpOutEvent, + final boolean isScheduledEvent) { if (isScheduledEvent && !scheduledTasksPerSubscriptionId.containsKey(subscriptionId)) { final ScheduledFuture<?> scheduledFuture = @@ -86,16 +86,15 @@ public class CmNotificationSubscriptionNcmpOutEventProducer { cmNotificationSubscriptionNcmpOutEvent); log.info("Published CmNotificationSubscriptionEvent on demand for subscriptionId : {}", subscriptionId); } - } private ScheduledFuture<?> scheduleAndPublishCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId, - final String eventType) { + final String eventType) { final CmNotificationSubscriptionNcmpOutEventPublishingTask cmNotificationSubscriptionNcmpOutEventPublishingTask = new CmNotificationSubscriptionNcmpOutEventPublishingTask(cmNotificationSubscriptionNcmpOutEventTopic, - subscriptionId, eventType, eventsPublisher, jsonObjectMapper, cmNotificationSubscriptionCache, - cmNotificationSubscriptionNcmpOutEventMapper); + subscriptionId, eventType, eventsPublisher, jsonObjectMapper, + cmNotificationSubscriptionNcmpOutEventMapper, dmiCmNotificationSubscriptionCacheHandler); return scheduledExecutorService.schedule(cmNotificationSubscriptionNcmpOutEventPublishingTask, cmNotificationSubscriptionDmiOutEventTimeoutInMs, TimeUnit.MILLISECONDS); } @@ -112,12 +111,15 @@ public class CmNotificationSubscriptionNcmpOutEventProducer { private void publishCmNotificationSubscriptionNcmpOutEventNow(final String subscriptionId, final String eventType, - final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent) { + final CmNotificationSubscriptionNcmpOutEvent + cmNotificationSubscriptionNcmpOutEvent) { final CloudEvent cmNotificationSubscriptionNcmpOutEventAsCloudEvent = buildAndGetCmNotificationNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType, cmNotificationSubscriptionNcmpOutEvent); eventsPublisher.publishCloudEvent(cmNotificationSubscriptionNcmpOutEventTopic, subscriptionId, cmNotificationSubscriptionNcmpOutEventAsCloudEvent); + dmiCmNotificationSubscriptionCacheHandler + .removeAcceptedAndRejectedDmiCmNotificationSubscriptionEntries(subscriptionId); } protected static CloudEvent buildAndGetCmNotificationNcmpOutEventAsCloudEvent( @@ -125,9 +127,9 @@ public class CmNotificationSubscriptionNcmpOutEventProducer { final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent) { return CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withType(eventType) - .withSource(URI.create("NCMP")).withDataSchema(URI.create("org.onap.ncmp.cm.subscription:1.0.0")) - .withExtension("correlationid", subscriptionId) - .withData(jsonObjectMapper.asJsonBytes(cmNotificationSubscriptionNcmpOutEvent)).build(); + .withSource(URI.create("NCMP")).withDataSchema(URI.create("org.onap.ncmp.cm.subscription:1.0.0")) + .withExtension("correlationid", subscriptionId) + .withData(jsonObjectMapper.asJsonBytes(cmNotificationSubscriptionNcmpOutEvent)).build(); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java index f7ea4a465c..b2c0a5cfb3 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java @@ -42,8 +42,8 @@ public class CmNotificationSubscriptionNcmpOutEventPublishingTask implements Run private final String eventType; private final EventsPublisher<CloudEvent> eventsPublisher; private final JsonObjectMapper jsonObjectMapper; - private final Map<String, Map<String, DmiCmNotificationSubscriptionDetails>> cmNotificationSubscriptionCache; private final CmNotificationSubscriptionNcmpOutEventMapper cmNotificationSubscriptionNcmpOutEventMapper; + private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler; /** * Delegating the responsibility of publishing CmNotificationSubscriptionNcmpOutEvent as a separate task which will @@ -52,13 +52,15 @@ public class CmNotificationSubscriptionNcmpOutEventPublishingTask implements Run @Override public void run() { final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap = - cmNotificationSubscriptionCache.get(subscriptionId); + dmiCmNotificationSubscriptionCacheHandler.get(subscriptionId); final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent = cmNotificationSubscriptionNcmpOutEventMapper.toCmNotificationSubscriptionNcmpOutEvent(subscriptionId, dmiCmNotificationSubscriptionDetailsMap); eventsPublisher.publishCloudEvent(topicName, subscriptionId, buildAndGetCmNotificationNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType, cmNotificationSubscriptionNcmpOutEvent)); + dmiCmNotificationSubscriptionCacheHandler + .removeAcceptedAndRejectedDmiCmNotificationSubscriptionEntries(subscriptionId); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java index 4b3a085147..34ffb5e19d 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java @@ -29,6 +29,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus; import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails; @@ -51,7 +52,7 @@ public class DmiCmNotificationSubscriptionCacheHandler { /** * Adds new subscription to the subscription cache. * - * @param subscriptionId subscription Id + * @param subscriptionId subscription id * @param predicates subscription request predicates */ public void add(final String subscriptionId, final List<Predicate> predicates) { @@ -59,6 +60,33 @@ public class DmiCmNotificationSubscriptionCacheHandler { } /** + * Get cm notification subscription cache entry via subscription id. + * + * @param subscriptionId subscription id + * @return map of dmi cm notification subscriptions per dmi + */ + public Map<String, DmiCmNotificationSubscriptionDetails> get(final String subscriptionId) { + return cmNotificationSubscriptionCache.get(subscriptionId); + } + + + /** + * Remove cache entries with CmNotificationSubscriptionStatus ACCEPTED/REJECTED via subscription id. + * + * @param subscriptionId subscription id as key in CM notification Subscription cache. + */ + public void removeAcceptedAndRejectedDmiCmNotificationSubscriptionEntries(final String subscriptionId) { + final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionsPerDmi = + cmNotificationSubscriptionCache.get(subscriptionId); + final Map<String, DmiCmNotificationSubscriptionDetails> updatedDmiCmNotificationSubscriptionsPerDmi = + dmiCmNotificationSubscriptionsPerDmi.entrySet().stream().filter( + dmiCmNotificationSubscription -> + !isAcceptedOrRejected(dmiCmNotificationSubscription.getValue())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + cmNotificationSubscriptionCache.put(subscriptionId, updatedDmiCmNotificationSubscriptionsPerDmi); + } + + /** * Creates map of subscription details per DMI. * * @param predicates CM Subscription Create Request Predicates @@ -95,9 +123,9 @@ public class DmiCmNotificationSubscriptionCacheHandler { * */ public void updateDmiCmNotificationSubscriptionStatusPerDmi( - final String subscriptionId, final String dmiServiceName, final CmNotificationSubscriptionStatus status) { + final String subscriptionId, final String dmiServiceName, final CmNotificationSubscriptionStatus status) { cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName) - .setCmNotificationSubscriptionStatus(status); + .setCmNotificationSubscriptionStatus(status); } /** @@ -109,10 +137,10 @@ public class DmiCmNotificationSubscriptionCacheHandler { */ public void persistIntoDatabasePerDmi(final String subscriptionId, final String dmiServiceName) { final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicateList = - cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName) - .getDmiCmNotificationSubscriptionPredicates(); + cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName) + .getDmiCmNotificationSubscriptionPredicates(); for (final DmiCmNotificationSubscriptionPredicate dmiCmNotificationSubscriptionPredicate: - dmiCmNotificationSubscriptionPredicateList) { + dmiCmNotificationSubscriptionPredicateList) { final DatastoreType datastoreType = dmiCmNotificationSubscriptionPredicate.getDatastoreType(); final Set<String> cmHandles = dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds(); final Set<String> xpaths = dmiCmNotificationSubscriptionPredicate.getXpaths(); @@ -153,4 +181,10 @@ public class DmiCmNotificationSubscriptionCacheHandler { } return targetCmHandlesByDmiServiceNames; } + + private boolean isAcceptedOrRejected( + final DmiCmNotificationSubscriptionDetails dmiCmNotificationSubscription) { + return dmiCmNotificationSubscription.getCmNotificationSubscriptionStatus().toString().equals("ACCEPTED") + || dmiCmNotificationSubscription.getCmNotificationSubscriptionStatus().toString().equals("REJECTED"); + } }
\ No newline at end of file diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducerSpec.groovy index 970d7e67b0..0f1bdc65b7 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducerSpec.groovy @@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.ObjectMapper import io.cloudevents.CloudEvent import org.onap.cps.events.EventsPublisher import org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper.CmNotificationSubscriptionNcmpOutEventMapper -import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails import org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.Data @@ -15,10 +14,11 @@ class CmNotificationSubscriptionNcmpOutEventProducerSpec extends Specification { def mockEventsPublisher = Mock(EventsPublisher) def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper()) - def mockCmNotificationSubscriptionCache = Mock(Map<String, Map<String, DmiCmNotificationSubscriptionDetails>>) def mockCmNotificationSubscriptionNcmpOutEventMapper = Mock(CmNotificationSubscriptionNcmpOutEventMapper) + def mockDmiCmNotificationSubscriptionCacheHandler = Mock(DmiCmNotificationSubscriptionCacheHandler) - def objectUnderTest = new CmNotificationSubscriptionNcmpOutEventProducer(mockEventsPublisher, jsonObjectMapper, mockCmNotificationSubscriptionCache, mockCmNotificationSubscriptionNcmpOutEventMapper) + def objectUnderTest = new CmNotificationSubscriptionNcmpOutEventProducer(mockEventsPublisher, jsonObjectMapper, + mockCmNotificationSubscriptionNcmpOutEventMapper, mockDmiCmNotificationSubscriptionCacheHandler) def 'Create and #scenario Cm Notification Subscription NCMP out event'() { given: 'a cm subscription response for the client' @@ -80,6 +80,8 @@ class CmNotificationSubscriptionNcmpOutEventProducerSpec extends Specification { assert CloudEventMapper.toTargetEvent(cmNotificationSubscriptionNcmpOutEventAsCloudEvent, CmNotificationSubscriptionNcmpOutEvent) == cmNotificationSubscriptionNcmpOutEvent } } + then: 'the cache handler is called once to remove accepted and rejected entries in cache' + 1 * mockDmiCmNotificationSubscriptionCacheHandler.removeAcceptedAndRejectedDmiCmNotificationSubscriptionEntries(subscriptionId) } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy index 10e060fee6..43568be501 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy @@ -25,6 +25,7 @@ import io.cloudevents.CloudEvent import io.cloudevents.core.builder.CloudEventBuilder import org.apache.kafka.clients.consumer.ConsumerRecord import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus +import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionPersistenceService import org.onap.cps.ncmp.api.impl.inventory.InventoryPersistence import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle @@ -75,6 +76,37 @@ class DmiCmNotificationSubscriptionCacheHandlerSpec extends MessagingBaseSpec { assert testCache.containsKey(subscriptionId) } + def 'Get cache entry via subscription id'() { + given: 'the cache contains value for some-id' + testCache.put('some-id',[:]) + when: 'the get method is called' + def result = objectUnderTest.get('some-id') + then: 'correct value is returned as expected' + assert result == [:] + } + + def 'Remove accepted and rejected entries from cache via subscription id'() { + given: 'a map as the value for cache entry for some-id' + def testMap = [:] + testMap.put("dmi-1", + new DmiCmNotificationSubscriptionDetails([],CmNotificationSubscriptionStatus.ACCEPTED)) + testMap.put("dmi-2", + new DmiCmNotificationSubscriptionDetails([],CmNotificationSubscriptionStatus.REJECTED)) + testMap.put("dmi-3", + new DmiCmNotificationSubscriptionDetails([],CmNotificationSubscriptionStatus.PENDING)) + testCache.put("test-id", testMap) + assert testCache.get("test-id").size() == 3 + when: 'the method to remove accepted and rejected entries for test-id is called' + objectUnderTest.removeAcceptedAndRejectedDmiCmNotificationSubscriptionEntries("test-id") + then: 'all entries with status accepted/rejected are no longer present for test-id' + testCache.get("test-id").each { key, testResultMap -> + assert testResultMap.cmNotificationSubscriptionStatus != CmNotificationSubscriptionStatus.ACCEPTED + || testResultMap.cmNotificationSubscriptionStatus != CmNotificationSubscriptionStatus.REJECTED + } + and: 'the size of the map for cache entry test-id is as expected' + assert testCache.get("test-id").size() == 1 + } + def 'Create map for DMI cm notification subscription per DMI service name'() { given: 'list of predicates from the create subscription event' def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates() |