From 2dd658268202f92e0b5621093747f13670e5d042 Mon Sep 17 00:00:00 2001 From: mpriyank Date: Mon, 4 Nov 2024 15:59:58 +0000 Subject: Cm Avc Event to have same key - incoming Cm Avc Event from DMI Plugin is consumed and forwarded to target topic - the key from source topic to be used in the target topic while forwarding - with same key the ordering of the message will be preserved - NOTE: the RTD related changes will be a separate patchset Issue-ID: CPS-2436 Change-Id: Ie692663706b378022ec0d621d92ca5054bad8d1b Signed-off-by: mpriyank --- .../cmavc/CmAvcEventConsumerSpec.groovy | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'cps-ncmp-service/src/test/groovy/org') diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy index 06651be913..ad5f42ed94 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy @@ -64,6 +64,7 @@ class CmAvcEventConsumerSpec extends MessagingBaseSpec { cloudEventKafkaConsumer.subscribe([cmEventsTopicName] as List) and: 'an event is sent' def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json') + def testEventKey = 'sample-eventid-key' def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class) def testCloudEventSent = CloudEventBuilder.v1() .withData(jsonObjectMapper.asJsonBytes(testEventSent)) @@ -72,17 +73,19 @@ class CmAvcEventConsumerSpec extends MessagingBaseSpec { .withSource(URI.create('sample-test-source')) .withExtension('correlationid', 'test-cmhandle1').build() and: 'event has header information' - def consumerRecord = new ConsumerRecord(cmEventsTopicName, 0, 0, 'sample-eventid', testCloudEventSent) - when: 'the event is consumed' + def consumerRecord = new ConsumerRecord(cmEventsTopicName, 0, 0, testEventKey, testCloudEventSent) + when: 'the event is consumed and forwarded to target topic' acvEventConsumer.consumeAndForward(consumerRecord) - and: 'the topic is polled' + and: 'the target topic is polled' def records = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)) then: 'poll returns one record' assert records.size() == 1 - and: 'record can be converted to AVC event' + and: 'target record can be converted to AVC event' def record = records.iterator().next() def cloudEvent = record.value() as CloudEvent def convertedAvcEvent = toTargetEvent(cloudEvent, AvcEvent.class) + and: 'the target event has the same key as the source event to maintain the ordering in a partition' + assert record.key() == consumerRecord.key() and: 'we have correct headers forwarded where correlation id matches' assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1' and: 'event id is same between consumed and forwarded' -- cgit 1.2.3-korg