diff options
5 files changed, 114 insertions, 28 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionNcmpInEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionNcmpInEventConsumer.java index 2c544b7b6a..fb3388c117 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionNcmpInEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionNcmpInEventConsumer.java @@ -23,11 +23,13 @@ package org.onap.cps.ncmp.api.impl.events.cmsubscription.consumer; import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent; import io.cloudevents.CloudEvent; +import java.util.List; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionHandlerService; import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.CmNotificationSubscriptionNcmpInEvent; +import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.Predicate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @@ -53,10 +55,16 @@ public class CmNotificationSubscriptionNcmpInEventConsumer { cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId()); final String subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId(); + final List<Predicate> predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates(); if ("subscriptionCreateRequest".equals(cloudEvent.getType())) { - log.info("Subscription for source {} with subscription id {} ...", cloudEvent.getSource(), subscriptionId); - cmNotificationSubscriptionHandlerService.processSubscriptionCreateRequest( - cmNotificationSubscriptionNcmpInEvent); + log.info("Subscription create request for source {} with subscription id {} ...", + cloudEvent.getSource(), subscriptionId); + cmNotificationSubscriptionHandlerService.processSubscriptionCreateRequest(subscriptionId, predicates); + } + if ("subscriptionDeleteRequest".equals(cloudEvent.getType())) { + log.info("Subscription delete request for source {} with subscription id {} ...", + cloudEvent.getSource(), subscriptionId); + cmNotificationSubscriptionHandlerService.processSubscriptionDeleteRequest(subscriptionId, predicates); } } }
\ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerService.java index 536693ee4e..1c52ffa798 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerService.java @@ -20,16 +20,25 @@ package org.onap.cps.ncmp.api.impl.events.cmsubscription.service; -import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.CmNotificationSubscriptionNcmpInEvent; +import java.util.List; +import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.Predicate; public interface CmNotificationSubscriptionHandlerService { /** - * Process cm notification subscription request. + * Process cm notification subscription create request. * - * @param cmNotificationSubscriptionNcmpInEvent CM Notification Subscription event + * @param subscriptionId subscription id + * @param predicates subscription predicates */ - void processSubscriptionCreateRequest( - final CmNotificationSubscriptionNcmpInEvent cmNotificationSubscriptionNcmpInEvent); + void processSubscriptionCreateRequest(final String subscriptionId, final List<Predicate> predicates); -} + /** + * Process cm notification subscription delete request. + * + * @param subscriptionId subscription id + * @param predicates subscription predicates + */ + void processSubscriptionDeleteRequest(final String subscriptionId, final List<Predicate> predicates); + +}
\ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImpl.java index 128c6751ce..08e3c95529 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImpl.java @@ -33,7 +33,6 @@ import org.onap.cps.ncmp.api.impl.events.cmsubscription.DmiCmNotificationSubscri import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus; import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails; import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate; -import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.CmNotificationSubscriptionNcmpInEvent; import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.Predicate; import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent; import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent; @@ -50,27 +49,32 @@ public class CmNotificationSubscriptionHandlerServiceImpl implements CmNotificat private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler; @Override - public void processSubscriptionCreateRequest( - final CmNotificationSubscriptionNcmpInEvent cmNotificationSubscriptionNcmpInEvent) { - final String subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId(); - final List<Predicate> predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates(); - + public void processSubscriptionCreateRequest(final String subscriptionId, final List<Predicate> predicates) { if (cmNotificationSubscriptionPersistenceService.isUniqueSubscriptionId(subscriptionId)) { dmiCmNotificationSubscriptionCacheHandler.add(subscriptionId, predicates); handleCmNotificationSubscriptionDelta(subscriptionId); - scheduleCmNotificationSubscriptionNcmpOutEventResponse(subscriptionId); + scheduleCmNotificationSubscriptionNcmpOutEventResponse(subscriptionId, + "subscriptionCreateResponse"); } else { rejectAndPublishCmNotificationSubscriptionCreateRequest(subscriptionId, predicates); } } - private void scheduleCmNotificationSubscriptionNcmpOutEventResponse(final String subscriptionId) { + @Override + public void processSubscriptionDeleteRequest(final String subscriptionId, final List<Predicate> predicates) { + dmiCmNotificationSubscriptionCacheHandler.add(subscriptionId, predicates); + sendSubscriptionDeleteRequestToDmi(subscriptionId); + scheduleCmNotificationSubscriptionNcmpOutEventResponse(subscriptionId, "subscriptionDeleteResponse"); + } + + private void scheduleCmNotificationSubscriptionNcmpOutEventResponse(final String subscriptionId, + final String eventType) { cmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionNcmpOutEvent(subscriptionId, - "subscriptionCreateResponse", null, true); + eventType, null, true); } private void rejectAndPublishCmNotificationSubscriptionCreateRequest(final String subscriptionId, - final List<Predicate> predicates) { + final List<Predicate> predicates) { final Set<String> subscriptionTargetFilters = predicates.stream().flatMap(predicate -> predicate.getTargetFilter().stream()) .collect(Collectors.toSet()); @@ -99,8 +103,9 @@ public class CmNotificationSubscriptionHandlerServiceImpl implements CmNotificat } private void publishCmNotificationSubscriptionDmiInEventPerDmi(final String subscriptionId, - final String dmiPluginName, - final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates) { + final String dmiPluginName, + final List<DmiCmNotificationSubscriptionPredicate> + dmiCmNotificationSubscriptionPredicates) { final CmNotificationSubscriptionDmiInEvent cmNotificationSubscriptionDmiInEvent = cmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionDmiInEvent( dmiCmNotificationSubscriptionPredicates); @@ -109,9 +114,21 @@ public class CmNotificationSubscriptionHandlerServiceImpl implements CmNotificat } private void acceptAndPublishCmNotificationSubscriptionNcmpOutEventPerDmi(final String subscriptionId, - final String dmiPluginName) { + final String dmiPluginName) { dmiCmNotificationSubscriptionCacheHandler.updateDmiCmNotificationSubscriptionStatusPerDmi(subscriptionId, dmiPluginName, CmNotificationSubscriptionStatus.ACCEPTED); dmiCmNotificationSubscriptionCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName); } -} + + private void sendSubscriptionDeleteRequestToDmi(final String subscriptionId) { + final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap = + dmiCmNotificationSubscriptionCacheHandler.get(subscriptionId); + dmiCmNotificationSubscriptionDetailsMap.forEach((dmiPluginName, dmiCmNotificationSubscriptionDetails) -> { + final CmNotificationSubscriptionDmiInEvent cmNotificationSubscriptionDmiInEvent = + cmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionDmiInEvent( + dmiCmNotificationSubscriptionDetails.getDmiCmNotificationSubscriptionPredicates()); + cmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionDmiInEvent(subscriptionId, + dmiPluginName, "subscriptionDeleteRequest", cmNotificationSubscriptionDmiInEvent); + }); + } +}
\ No newline at end of file diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpInEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpInEventConsumerSpec.groovy index f07f3c1e6f..01a92c02fa 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpInEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpInEventConsumerSpec.groovy @@ -78,11 +78,34 @@ class CmNotificationSubscriptionNcmpInEventConsumerSpec extends MessagingBaseSpe def loggingEvent = getLoggingEvent() assert loggingEvent.level == Level.INFO and: 'the log indicates the task completed successfully' - assert loggingEvent.formattedMessage == 'Subscription for source some-resource with subscription id test-id ...' + assert loggingEvent.formattedMessage == 'Subscription create request for source some-resource with subscription id test-id ...' and: 'the subscription handler service is called once' - 1 * mockCmNotificationSubscriptionHandlerService.processSubscriptionCreateRequest(_) + 1 * mockCmNotificationSubscriptionHandlerService.processSubscriptionCreateRequest('test-id',_) } + def 'Consume valid CmNotificationSubscriptionNcmpInEvent delete message'() { + given: 'a cmNotificationSubscription event' + def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json') + def testEventSent = jsonObjectMapper.convertJsonString(jsonData, CmNotificationSubscriptionNcmpInEvent.class) + def testCloudEventSent = CloudEventBuilder.v1() + .withData(objectMapper.writeValueAsBytes(testEventSent)) + .withId('sub-id') + .withType('subscriptionDeleteRequest') + .withSource(URI.create('some-resource')) + .withExtension('correlationid', 'test-cmhandle1').build() + def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent) + when: 'the valid event is consumed' + objectUnderTest.consumeSubscriptionEvent(consumerRecord) + then: 'an event is logged with level INFO' + def loggingEvent = getLoggingEvent() + assert loggingEvent.level == Level.INFO + and: 'the log indicates the task completed successfully' + assert loggingEvent.formattedMessage == 'Subscription delete request for source some-resource with subscription id test-id ...' + and: 'the subscription handler service is called once' + 1 * mockCmNotificationSubscriptionHandlerService.processSubscriptionDeleteRequest('test-id',_) + } + + def getLoggingEvent() { return logger.list[1] } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImplSpec.groovy index 9156ae910f..982150ec0a 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImplSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImplSpec.groovy @@ -57,6 +57,9 @@ class CmNotificationSubscriptionHandlerServiceImplSpec extends Specification{ def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, CmNotificationSubscriptionNcmpInEvent.class) def testListOfDeltaPredicates = [new DmiCmNotificationSubscriptionPredicate(['ch1'].toSet(), DatastoreType.PASSTHROUGH_OPERATIONAL, ['/a/b'].toSet())] mockCmNotificationSubscriptionPersistenceService.isUniqueSubscriptionId("test-id") >> true + and: 'relevant details is extracted from the event' + def subscriptionId = testEventConsumed.getData().getSubscriptionId() + def predicates = testEventConsumed.getData().getPredicates() and: 'the cache handler returns for relevant subscription id' 1 * mockDmiCmNotificationSubscriptionCacheHandler.get("test-id") >> testSubscriptionDetailsMap and: 'the delta predicates is returned' @@ -66,7 +69,7 @@ class CmNotificationSubscriptionHandlerServiceImplSpec extends Specification{ 1 * mockCmNotificationSubscriptionMappersHandler .toCmNotificationSubscriptionDmiInEvent(testListOfDeltaPredicates) >> testDmiInEvent when: 'the valid and unique event is consumed' - objectUnderTest.processSubscriptionCreateRequest(testEventConsumed) + objectUnderTest.processSubscriptionCreateRequest(subscriptionId, predicates) then: 'the subscription cache handler is called once' 1 * mockDmiCmNotificationSubscriptionCacheHandler.add('test-id',_) and: 'the events handler method to publish DMI event is called correct number of times with the correct parameters' @@ -88,7 +91,7 @@ class CmNotificationSubscriptionHandlerServiceImplSpec extends Specification{ and: 'the delta predicates is returned' 1 * mockCmNotificationSubscriptionDelta.getDelta(_) >> noDeltaPredicates when: 'the valid and unique event is consumed' - objectUnderTest.processSubscriptionCreateRequest(testEventConsumed) + objectUnderTest.processSubscriptionCreateRequest('test-id', noDeltaPredicates) then: 'the subscription cache handler is called once' 1 * mockDmiCmNotificationSubscriptionCacheHandler.add('test-id', _) and: 'the subscription details are updated in the cache' @@ -103,16 +106,42 @@ class CmNotificationSubscriptionHandlerServiceImplSpec extends Specification{ def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json') def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, CmNotificationSubscriptionNcmpInEvent.class) mockCmNotificationSubscriptionPersistenceService.isUniqueSubscriptionId('test-id') >> false + and: 'relevant details is extracted from the event' + def subscriptionId = testEventConsumed.getData().getSubscriptionId() + def predicates = testEventConsumed.getData().getPredicates() and: 'the NCMP out in event mapper returns an event for rejected request' def testNcmpOutEvent = new CmNotificationSubscriptionNcmpOutEvent() 1 * mockCmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionNcmpOutEventForRejectedRequest( "test-id",_) >> testNcmpOutEvent when: 'the valid but non-unique event is consumed' - objectUnderTest.processSubscriptionCreateRequest(testEventConsumed) + objectUnderTest.processSubscriptionCreateRequest(subscriptionId, predicates) then: 'the events handler method to publish DMI event is never called' 0 * mockCmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionDmiInEvent(_,_,_,_) and: 'the events handler method to publish NCMP out event is called once' 1 * mockCmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionNcmpOutEvent( 'test-id', 'subscriptionCreateResponse', testNcmpOutEvent, false) } + + def 'Consume valid CmNotificationSubscriptionNcmpInEvent delete message'() { + given: 'a cmNotificationSubscriptionNcmp in event for delete' + def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json') + def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, CmNotificationSubscriptionNcmpInEvent.class) + and: 'relevant details is extracted from the event' + def subscriptionId = testEventConsumed.getData().getSubscriptionId() + def predicates = testEventConsumed.getData().getPredicates() + and: 'the cache handler returns for relevant subscription id' + 1 * mockDmiCmNotificationSubscriptionCacheHandler.get('test-id') >> testSubscriptionDetailsMap + when: 'the valid and unique event is consumed' + objectUnderTest.processSubscriptionDeleteRequest(subscriptionId, predicates) + then: 'the subscription cache handler is called once' + 1 * mockDmiCmNotificationSubscriptionCacheHandler.add('test-id', predicates) + and: 'the mapper handler to get DMI in event is called once' + 1 * mockCmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionDmiInEvent(_) + and: 'the events handler method to publish DMI event is called correct number of times with the correct parameters' + testSubscriptionDetailsMap.size() * mockCmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionDmiInEvent( + 'test-id', 'dmi-1', 'subscriptionDeleteRequest', _) + and: 'we schedule to send the response after configured time from the cache' + 1 * mockCmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionNcmpOutEvent( + 'test-id', 'subscriptionDeleteResponse', null, true) + } } |