summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumer.java21
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java43
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumerSpec.groovy35
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy33
4 files changed, 126 insertions, 6 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumer.java
index ea72fd217b..82ae5467e2 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumer.java
@@ -23,16 +23,21 @@ package org.onap.cps.ncmp.api.impl.events.cmsubscription;
import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent;
import io.cloudevents.CloudEvent;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus;
import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
+@RequiredArgsConstructor
public class CmNotificationSubscriptionDmiOutEventConsumer {
+ private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler;
+
/**
* Consume the Cm Notification Subscription event from the dmi-plugin.
*
@@ -56,7 +61,23 @@ public class CmNotificationSubscriptionDmiOutEventConsumer {
final CmNotificationSubscriptionDmiOutEvent cmNotificationSubscriptionDmiOutEvent) {
final String subscriptionId = correlationId.split("#")[0];
final String dmiPluginName = correlationId.split("#")[1];
+
+ if ("ACCEPTED".equals(cmNotificationSubscriptionDmiOutEvent.getData().getStatusMessage())) {
+ handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmNotificationSubscriptionStatus.ACCEPTED);
+ dmiCmNotificationSubscriptionCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
+ }
+
+ if ("REJECTED".equals(cmNotificationSubscriptionDmiOutEvent.getData().getStatusMessage())) {
+ handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmNotificationSubscriptionStatus.REJECTED);
+ }
+
log.info("Cm Subscription with id : {} handled by the dmi-plugin : {} has the status : {}", subscriptionId,
dmiPluginName, cmNotificationSubscriptionDmiOutEvent.getData().getStatusMessage());
}
+
+ private void handleCacheStatusPerDmi(final String subscriptionId, final String dmiPluginName,
+ final CmNotificationSubscriptionStatus cmNotificationSubscriptionStatus) {
+ dmiCmNotificationSubscriptionCacheHandler.updateDmiCmNotificationSubscriptionStatusPerDmi(subscriptionId,
+ dmiPluginName, cmNotificationSubscriptionStatus);
+ }
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java
index 2f10b1ce1e..8c1cac39dc 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java
@@ -30,8 +30,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.RequiredArgsConstructor;
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus;
import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate;
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionPersistenceService;
import org.onap.cps.ncmp.api.impl.inventory.InventoryPersistence;
import org.onap.cps.ncmp.api.impl.operations.DatastoreType;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
@@ -42,6 +44,7 @@ import org.springframework.stereotype.Component;
@RequiredArgsConstructor
public class DmiCmNotificationSubscriptionCacheHandler {
+ private final CmNotificationSubscriptionPersistenceService cmNotificationSubscriptionPersistenceService;
private final Map<String, Map<String, DmiCmNotificationSubscriptionDetails>> cmNotificationSubscriptionCache;
private final InventoryPersistence inventoryPersistence;
@@ -83,6 +86,46 @@ public class DmiCmNotificationSubscriptionCacheHandler {
return dmiCmNotificationSubscriptionDetailsPerDmi;
}
+ /**
+ * Update status in map of subscription details per DMI.
+ *
+ * @param subscriptionId String of subscription Id
+ * @param dmiServiceName String of dmiServiceName
+ * @param status String of status
+ *
+ */
+ public void updateDmiCmNotificationSubscriptionStatusPerDmi(
+ final String subscriptionId, final String dmiServiceName, final CmNotificationSubscriptionStatus status) {
+ cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
+ .setCmNotificationSubscriptionStatus(status);
+ }
+
+ /**
+ * Persist map of subscription details per DMI.
+ *
+ * @param subscriptionId String of subscription Id
+ * @param dmiServiceName String of dmiServiceName
+ *
+ */
+ public void persistIntoDatabasePerDmi(final String subscriptionId, final String dmiServiceName) {
+ final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicateList =
+ cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
+ .getDmiCmNotificationSubscriptionPredicates();
+ for (final DmiCmNotificationSubscriptionPredicate dmiCmNotificationSubscriptionPredicate:
+ dmiCmNotificationSubscriptionPredicateList) {
+ final DatastoreType datastoreType = dmiCmNotificationSubscriptionPredicate.getDatastoreType();
+ final Set<String> cmHandles = dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds();
+ final Set<String> xpaths = dmiCmNotificationSubscriptionPredicate.getXpaths();
+
+ for (final String cmHandle: cmHandles) {
+ for (final String xpath: xpaths) {
+ cmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(datastoreType,
+ cmHandle, xpath, subscriptionId);
+ }
+ }
+ }
+ }
+
private void updateDmiCmNotificationSubscriptionDetailsPerDmi(
final String dmiServiceName,
final DmiCmNotificationSubscriptionPredicate dmiCmNotificationSubscriptionPredicate,
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumerSpec.groovy
index 4f0132e2bd..523ec767c6 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumerSpec.groovy
@@ -28,26 +28,32 @@ import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
import io.cloudevents.core.builder.CloudEventBuilder
import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent
+import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.Data
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.utils.JsonObjectMapper
import org.slf4j.LoggerFactory
+import org.spockframework.spring.SpringBean
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
class CmNotificationSubscriptionDmiOutEventConsumerSpec extends MessagingBaseSpec {
- def objectUnderTest = new CmNotificationSubscriptionDmiOutEventConsumer()
- def logger = Spy(ListAppender<ILoggingEvent>)
-
@Autowired
JsonObjectMapper jsonObjectMapper
@Autowired
ObjectMapper objectMapper
+ @SpringBean
+ DmiCmNotificationSubscriptionCacheHandler mockDmiCmNotificationSubscriptionCacheHandler = Mock(DmiCmNotificationSubscriptionCacheHandler)
+
+ def objectUnderTest = new CmNotificationSubscriptionDmiOutEventConsumer(mockDmiCmNotificationSubscriptionCacheHandler)
+ def logger = Spy(ListAppender<ILoggingEvent>)
+
void setup() {
((Logger) LoggerFactory.getLogger(CmNotificationSubscriptionDmiOutEventConsumer.class)).addAppender(logger)
logger.start()
@@ -78,6 +84,29 @@ class CmNotificationSubscriptionDmiOutEventConsumerSpec extends MessagingBaseSpe
assert loggingEvent.formattedMessage == 'Cm Subscription with id : sub-1 handled by the dmi-plugin : test-dmi-plugin-name has the status : accepted'
}
+ def 'Consume a valid CM Notification Subscription Event and perform correct actions base on status'() {
+ given: 'a cmNotificationSubscription event'
+ def dmiOutEventData = new Data(statusCode: statusCode, statusMessage: subscriptionStatus.toString())
+ def cmNotificationSubscriptionDmiOutEvent = new CmNotificationSubscriptionDmiOutEvent().withData(dmiOutEventData)
+ def testCloudEventSent = CloudEventBuilder.v1()
+ .withData(objectMapper.writeValueAsBytes(cmNotificationSubscriptionDmiOutEvent))
+ .withId('random-uuid')
+ .withType('subscriptionCreateResponse')
+ .withSource(URI.create('test-dmi-plugin-name'))
+ .withExtension('correlationid', 'sub-1#test-dmi-plugin-name').build()
+ def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
+ when: 'the event is consumed'
+ objectUnderTest.consumeCmNotificationSubscriptionDmiOutEvent(consumerRecord)
+ then: 'correct number of calls to cache'
+ expectedCacheCalls * mockDmiCmNotificationSubscriptionCacheHandler.updateDmiCmNotificationSubscriptionStatusPerDmi('sub-1','test-dmi-plugin-name', subscriptionStatus)
+ and: 'correct number of calls to persist cache'
+ expectedPersistenceCalls * mockDmiCmNotificationSubscriptionCacheHandler.persistIntoDatabasePerDmi('sub-1','test-dmi-plugin-name')
+ where: 'the following parameters are used'
+ scenario | subscriptionStatus | statusCode || expectedCacheCalls | expectedPersistenceCalls
+ 'Accepted Status' | CmNotificationSubscriptionStatus.ACCEPTED | '1' || 1 | 1
+ 'Rejected Status' | CmNotificationSubscriptionStatus.REJECTED | '2' || 1 | 0
+ }
+
def getLoggingEvent() {
return logger.list[0]
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy
index 132c4bc4a7..47a1c89468 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
import io.cloudevents.core.builder.CloudEventBuilder
import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionPersistenceService
import org.onap.cps.ncmp.api.impl.inventory.InventoryPersistence
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
@@ -45,9 +47,11 @@ class DmiCmNotificationSubscriptionCacheHandlerSpec extends MessagingBaseSpec {
ObjectMapper objectMapper
@SpringBean
InventoryPersistence mockInventoryPersistence = Mock(InventoryPersistence)
+ @SpringBean
+ CmNotificationSubscriptionPersistenceService mockCmNotificationSubscriptionPersistenceService = Mock(CmNotificationSubscriptionPersistenceService)
def testCache = [:]
- def objectUnderTest = new DmiCmNotificationSubscriptionCacheHandler(testCache, mockInventoryPersistence)
+ def objectUnderTest = new DmiCmNotificationSubscriptionCacheHandler(mockCmNotificationSubscriptionPersistenceService, testCache, mockInventoryPersistence)
CmNotificationSubscriptionNcmpInEvent cmNotificationSubscriptionNcmpInEvent
def yangModelCmHandle1 = new YangModelCmHandle(id:'ch1',dmiServiceName:'dmi-1')
@@ -62,9 +66,9 @@ class DmiCmNotificationSubscriptionCacheHandlerSpec extends MessagingBaseSpec {
def 'Load CM subscription event to cache'() {
given: 'a valid subscription event with Id'
- def subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId();
+ def subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId()
and: 'list of predicates'
- def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates();
+ def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates()
when: 'a valid event object loaded in cache'
objectUnderTest.add(subscriptionId, predicates)
then: 'the cache contains the correct entry with #subscriptionId subscription ID'
@@ -115,6 +119,29 @@ class DmiCmNotificationSubscriptionCacheHandlerSpec extends MessagingBaseSpec {
assert mapOfCMHandleIDsByDmi.get('dmi-2') == ['ch2'].toSet()
}
+ def 'Update subscription status in cache per DMI service name'() {
+ given: 'populated cache'
+ def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates()
+ def subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId()
+ objectUnderTest.add(subscriptionId, predicates)
+ when: 'subscription status per dmi is updated in cache'
+ objectUnderTest.updateDmiCmNotificationSubscriptionStatusPerDmi(subscriptionId,'dmi-1', CmNotificationSubscriptionStatus.ACCEPTED)
+ then: 'verify status has been updated in cache'
+ def predicate = testCache.get(subscriptionId)
+ assert predicate.get('dmi-1').cmNotificationSubscriptionStatus == CmNotificationSubscriptionStatus.ACCEPTED
+ }
+
+ def 'Persist Cache into database per dmi'() {
+ given: 'populate cache'
+ def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates()
+ def subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId()
+ objectUnderTest.add(subscriptionId, predicates)
+ when: 'subscription is persisted in database'
+ objectUnderTest.persistIntoDatabasePerDmi(subscriptionId,'dmi-1')
+ then: 'persistence service is called the correct number of times per dmi'
+ 4 * mockCmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(_,_,_,subscriptionId)
+ }
+
def setUpTestEvent(){
def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, CmNotificationSubscriptionNcmpInEvent.class)