aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducer.java26
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java6
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java46
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducerSpec.groovy8
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy32
5 files changed, 95 insertions, 23 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducer.java
index 76ee08e64c..e7af69e500 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducer.java
@@ -34,7 +34,6 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.events.EventsPublisher;
import org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper.CmNotificationSubscriptionNcmpOutEventMapper;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent;
import org.onap.cps.utils.JsonObjectMapper;
import org.springframework.beans.factory.annotation.Value;
@@ -55,8 +54,8 @@ public class CmNotificationSubscriptionNcmpOutEventProducer {
private final EventsPublisher<CloudEvent> eventsPublisher;
private final JsonObjectMapper jsonObjectMapper;
- private final Map<String, Map<String, DmiCmNotificationSubscriptionDetails>> cmNotificationSubscriptionCache;
private final CmNotificationSubscriptionNcmpOutEventMapper cmNotificationSubscriptionNcmpOutEventMapper;
+ private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private static final Map<String, ScheduledFuture<?>> scheduledTasksPerSubscriptionId = new ConcurrentHashMap<>();
@@ -72,8 +71,9 @@ public class CmNotificationSubscriptionNcmpOutEventProducer {
* or published now
*/
public void publishCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId, final String eventType,
- final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent,
- final boolean isScheduledEvent) {
+ final CmNotificationSubscriptionNcmpOutEvent
+ cmNotificationSubscriptionNcmpOutEvent,
+ final boolean isScheduledEvent) {
if (isScheduledEvent && !scheduledTasksPerSubscriptionId.containsKey(subscriptionId)) {
final ScheduledFuture<?> scheduledFuture =
@@ -86,16 +86,15 @@ public class CmNotificationSubscriptionNcmpOutEventProducer {
cmNotificationSubscriptionNcmpOutEvent);
log.info("Published CmNotificationSubscriptionEvent on demand for subscriptionId : {}", subscriptionId);
}
-
}
private ScheduledFuture<?> scheduleAndPublishCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId,
- final String eventType) {
+ final String eventType) {
final CmNotificationSubscriptionNcmpOutEventPublishingTask
cmNotificationSubscriptionNcmpOutEventPublishingTask =
new CmNotificationSubscriptionNcmpOutEventPublishingTask(cmNotificationSubscriptionNcmpOutEventTopic,
- subscriptionId, eventType, eventsPublisher, jsonObjectMapper, cmNotificationSubscriptionCache,
- cmNotificationSubscriptionNcmpOutEventMapper);
+ subscriptionId, eventType, eventsPublisher, jsonObjectMapper,
+ cmNotificationSubscriptionNcmpOutEventMapper, dmiCmNotificationSubscriptionCacheHandler);
return scheduledExecutorService.schedule(cmNotificationSubscriptionNcmpOutEventPublishingTask,
cmNotificationSubscriptionDmiOutEventTimeoutInMs, TimeUnit.MILLISECONDS);
}
@@ -112,12 +111,15 @@ public class CmNotificationSubscriptionNcmpOutEventProducer {
private void publishCmNotificationSubscriptionNcmpOutEventNow(final String subscriptionId, final String eventType,
- final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent) {
+ final CmNotificationSubscriptionNcmpOutEvent
+ cmNotificationSubscriptionNcmpOutEvent) {
final CloudEvent cmNotificationSubscriptionNcmpOutEventAsCloudEvent =
buildAndGetCmNotificationNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType,
cmNotificationSubscriptionNcmpOutEvent);
eventsPublisher.publishCloudEvent(cmNotificationSubscriptionNcmpOutEventTopic, subscriptionId,
cmNotificationSubscriptionNcmpOutEventAsCloudEvent);
+ dmiCmNotificationSubscriptionCacheHandler
+ .removeAcceptedAndRejectedDmiCmNotificationSubscriptionEntries(subscriptionId);
}
protected static CloudEvent buildAndGetCmNotificationNcmpOutEventAsCloudEvent(
@@ -125,9 +127,9 @@ public class CmNotificationSubscriptionNcmpOutEventProducer {
final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent) {
return CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withType(eventType)
- .withSource(URI.create("NCMP")).withDataSchema(URI.create("org.onap.ncmp.cm.subscription:1.0.0"))
- .withExtension("correlationid", subscriptionId)
- .withData(jsonObjectMapper.asJsonBytes(cmNotificationSubscriptionNcmpOutEvent)).build();
+ .withSource(URI.create("NCMP")).withDataSchema(URI.create("org.onap.ncmp.cm.subscription:1.0.0"))
+ .withExtension("correlationid", subscriptionId)
+ .withData(jsonObjectMapper.asJsonBytes(cmNotificationSubscriptionNcmpOutEvent)).build();
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java
index f7ea4a465c..b2c0a5cfb3 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java
@@ -42,8 +42,8 @@ public class CmNotificationSubscriptionNcmpOutEventPublishingTask implements Run
private final String eventType;
private final EventsPublisher<CloudEvent> eventsPublisher;
private final JsonObjectMapper jsonObjectMapper;
- private final Map<String, Map<String, DmiCmNotificationSubscriptionDetails>> cmNotificationSubscriptionCache;
private final CmNotificationSubscriptionNcmpOutEventMapper cmNotificationSubscriptionNcmpOutEventMapper;
+ private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler;
/**
* Delegating the responsibility of publishing CmNotificationSubscriptionNcmpOutEvent as a separate task which will
@@ -52,13 +52,15 @@ public class CmNotificationSubscriptionNcmpOutEventPublishingTask implements Run
@Override
public void run() {
final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap =
- cmNotificationSubscriptionCache.get(subscriptionId);
+ dmiCmNotificationSubscriptionCacheHandler.get(subscriptionId);
final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent =
cmNotificationSubscriptionNcmpOutEventMapper.toCmNotificationSubscriptionNcmpOutEvent(subscriptionId,
dmiCmNotificationSubscriptionDetailsMap);
eventsPublisher.publishCloudEvent(topicName, subscriptionId,
buildAndGetCmNotificationNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType,
cmNotificationSubscriptionNcmpOutEvent));
+ dmiCmNotificationSubscriptionCacheHandler
+ .removeAcceptedAndRejectedDmiCmNotificationSubscriptionEntries(subscriptionId);
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java
index 4b3a085147..34ffb5e19d 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus;
import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
@@ -51,7 +52,7 @@ public class DmiCmNotificationSubscriptionCacheHandler {
/**
* Adds new subscription to the subscription cache.
*
- * @param subscriptionId subscription Id
+ * @param subscriptionId subscription id
* @param predicates subscription request predicates
*/
public void add(final String subscriptionId, final List<Predicate> predicates) {
@@ -59,6 +60,33 @@ public class DmiCmNotificationSubscriptionCacheHandler {
}
/**
+ * Get cm notification subscription cache entry via subscription id.
+ *
+ * @param subscriptionId subscription id
+ * @return map of dmi cm notification subscriptions per dmi
+ */
+ public Map<String, DmiCmNotificationSubscriptionDetails> get(final String subscriptionId) {
+ return cmNotificationSubscriptionCache.get(subscriptionId);
+ }
+
+
+ /**
+ * Remove cache entries with CmNotificationSubscriptionStatus ACCEPTED/REJECTED via subscription id.
+ *
+ * @param subscriptionId subscription id as key in CM notification Subscription cache.
+ */
+ public void removeAcceptedAndRejectedDmiCmNotificationSubscriptionEntries(final String subscriptionId) {
+ final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionsPerDmi =
+ cmNotificationSubscriptionCache.get(subscriptionId);
+ final Map<String, DmiCmNotificationSubscriptionDetails> updatedDmiCmNotificationSubscriptionsPerDmi =
+ dmiCmNotificationSubscriptionsPerDmi.entrySet().stream().filter(
+ dmiCmNotificationSubscription ->
+ !isAcceptedOrRejected(dmiCmNotificationSubscription.getValue()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ cmNotificationSubscriptionCache.put(subscriptionId, updatedDmiCmNotificationSubscriptionsPerDmi);
+ }
+
+ /**
* Creates map of subscription details per DMI.
*
* @param predicates CM Subscription Create Request Predicates
@@ -95,9 +123,9 @@ public class DmiCmNotificationSubscriptionCacheHandler {
*
*/
public void updateDmiCmNotificationSubscriptionStatusPerDmi(
- final String subscriptionId, final String dmiServiceName, final CmNotificationSubscriptionStatus status) {
+ final String subscriptionId, final String dmiServiceName, final CmNotificationSubscriptionStatus status) {
cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
- .setCmNotificationSubscriptionStatus(status);
+ .setCmNotificationSubscriptionStatus(status);
}
/**
@@ -109,10 +137,10 @@ public class DmiCmNotificationSubscriptionCacheHandler {
*/
public void persistIntoDatabasePerDmi(final String subscriptionId, final String dmiServiceName) {
final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicateList =
- cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
- .getDmiCmNotificationSubscriptionPredicates();
+ cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
+ .getDmiCmNotificationSubscriptionPredicates();
for (final DmiCmNotificationSubscriptionPredicate dmiCmNotificationSubscriptionPredicate:
- dmiCmNotificationSubscriptionPredicateList) {
+ dmiCmNotificationSubscriptionPredicateList) {
final DatastoreType datastoreType = dmiCmNotificationSubscriptionPredicate.getDatastoreType();
final Set<String> cmHandles = dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds();
final Set<String> xpaths = dmiCmNotificationSubscriptionPredicate.getXpaths();
@@ -153,4 +181,10 @@ public class DmiCmNotificationSubscriptionCacheHandler {
}
return targetCmHandlesByDmiServiceNames;
}
+
+ private boolean isAcceptedOrRejected(
+ final DmiCmNotificationSubscriptionDetails dmiCmNotificationSubscription) {
+ return dmiCmNotificationSubscription.getCmNotificationSubscriptionStatus().toString().equals("ACCEPTED")
+ || dmiCmNotificationSubscription.getCmNotificationSubscriptionStatus().toString().equals("REJECTED");
+ }
} \ No newline at end of file
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducerSpec.groovy
index 970d7e67b0..0f1bdc65b7 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducerSpec.groovy
@@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
import org.onap.cps.events.EventsPublisher
import org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper.CmNotificationSubscriptionNcmpOutEventMapper
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails
import org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper
import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent
import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.Data
@@ -15,10 +14,11 @@ class CmNotificationSubscriptionNcmpOutEventProducerSpec extends Specification {
def mockEventsPublisher = Mock(EventsPublisher)
def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
- def mockCmNotificationSubscriptionCache = Mock(Map<String, Map<String, DmiCmNotificationSubscriptionDetails>>)
def mockCmNotificationSubscriptionNcmpOutEventMapper = Mock(CmNotificationSubscriptionNcmpOutEventMapper)
+ def mockDmiCmNotificationSubscriptionCacheHandler = Mock(DmiCmNotificationSubscriptionCacheHandler)
- def objectUnderTest = new CmNotificationSubscriptionNcmpOutEventProducer(mockEventsPublisher, jsonObjectMapper, mockCmNotificationSubscriptionCache, mockCmNotificationSubscriptionNcmpOutEventMapper)
+ def objectUnderTest = new CmNotificationSubscriptionNcmpOutEventProducer(mockEventsPublisher, jsonObjectMapper,
+ mockCmNotificationSubscriptionNcmpOutEventMapper, mockDmiCmNotificationSubscriptionCacheHandler)
def 'Create and #scenario Cm Notification Subscription NCMP out event'() {
given: 'a cm subscription response for the client'
@@ -80,6 +80,8 @@ class CmNotificationSubscriptionNcmpOutEventProducerSpec extends Specification {
assert CloudEventMapper.toTargetEvent(cmNotificationSubscriptionNcmpOutEventAsCloudEvent, CmNotificationSubscriptionNcmpOutEvent) == cmNotificationSubscriptionNcmpOutEvent
}
}
+ then: 'the cache handler is called once to remove accepted and rejected entries in cache'
+ 1 * mockDmiCmNotificationSubscriptionCacheHandler.removeAcceptedAndRejectedDmiCmNotificationSubscriptionEntries(subscriptionId)
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy
index 10e060fee6..43568be501 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy
@@ -25,6 +25,7 @@ import io.cloudevents.CloudEvent
import io.cloudevents.core.builder.CloudEventBuilder
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails
import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionPersistenceService
import org.onap.cps.ncmp.api.impl.inventory.InventoryPersistence
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
@@ -75,6 +76,37 @@ class DmiCmNotificationSubscriptionCacheHandlerSpec extends MessagingBaseSpec {
assert testCache.containsKey(subscriptionId)
}
+ def 'Get cache entry via subscription id'() {
+ given: 'the cache contains value for some-id'
+ testCache.put('some-id',[:])
+ when: 'the get method is called'
+ def result = objectUnderTest.get('some-id')
+ then: 'correct value is returned as expected'
+ assert result == [:]
+ }
+
+ def 'Remove accepted and rejected entries from cache via subscription id'() {
+ given: 'a map as the value for cache entry for some-id'
+ def testMap = [:]
+ testMap.put("dmi-1",
+ new DmiCmNotificationSubscriptionDetails([],CmNotificationSubscriptionStatus.ACCEPTED))
+ testMap.put("dmi-2",
+ new DmiCmNotificationSubscriptionDetails([],CmNotificationSubscriptionStatus.REJECTED))
+ testMap.put("dmi-3",
+ new DmiCmNotificationSubscriptionDetails([],CmNotificationSubscriptionStatus.PENDING))
+ testCache.put("test-id", testMap)
+ assert testCache.get("test-id").size() == 3
+ when: 'the method to remove accepted and rejected entries for test-id is called'
+ objectUnderTest.removeAcceptedAndRejectedDmiCmNotificationSubscriptionEntries("test-id")
+ then: 'all entries with status accepted/rejected are no longer present for test-id'
+ testCache.get("test-id").each { key, testResultMap ->
+ assert testResultMap.cmNotificationSubscriptionStatus != CmNotificationSubscriptionStatus.ACCEPTED
+ || testResultMap.cmNotificationSubscriptionStatus != CmNotificationSubscriptionStatus.REJECTED
+ }
+ and: 'the size of the map for cache entry test-id is as expected'
+ assert testCache.get("test-id").size() == 1
+ }
+
def 'Create map for DMI cm notification subscription per DMI service name'() {
given: 'list of predicates from the create subscription event'
def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates()