diff options
4 files changed, 20 insertions, 9 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java index 9e90eabbc4..2d1f64802b 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java @@ -46,7 +46,8 @@ public class CmAvcEventConsumer { private final EventsPublisher<CloudEvent> eventsPublisher; /** - * Incoming AvcEvent in the form of Consumer Record. + * Incoming Cm AvcEvent in the form of Consumer Record, it will be forwarded as is to a target topic. + * The key from incoming record will be used as key for the target topic as well to preserve the message ordering. * * @param cmAvcEventAsConsumerRecord Incoming raw consumer record */ @@ -55,7 +56,8 @@ public class CmAvcEventConsumer { public void consumeAndForward( final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) { final CloudEvent outgoingAvcEvent = cmAvcEventAsConsumerRecord.value(); - log.debug("Consuming AVC event {} ...", outgoingAvcEvent); - eventsPublisher.publishCloudEvent(cmEventsTopicName, outgoingAvcEvent.getId(), outgoingAvcEvent); + final String outgoingAvcEventKey = cmAvcEventAsConsumerRecord.key(); + log.debug("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent); + eventsPublisher.publishCloudEvent(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent); } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy index 06651be913..ad5f42ed94 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy @@ -64,6 +64,7 @@ class CmAvcEventConsumerSpec extends MessagingBaseSpec { cloudEventKafkaConsumer.subscribe([cmEventsTopicName] as List<String>) and: 'an event is sent' def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json') + def testEventKey = 'sample-eventid-key' def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class) def testCloudEventSent = CloudEventBuilder.v1() .withData(jsonObjectMapper.asJsonBytes(testEventSent)) @@ -72,17 +73,19 @@ class CmAvcEventConsumerSpec extends MessagingBaseSpec { .withSource(URI.create('sample-test-source')) .withExtension('correlationid', 'test-cmhandle1').build() and: 'event has header information' - def consumerRecord = new ConsumerRecord<String, CloudEvent>(cmEventsTopicName, 0, 0, 'sample-eventid', testCloudEventSent) - when: 'the event is consumed' + def consumerRecord = new ConsumerRecord<String, CloudEvent>(cmEventsTopicName, 0, 0, testEventKey, testCloudEventSent) + when: 'the event is consumed and forwarded to target topic' acvEventConsumer.consumeAndForward(consumerRecord) - and: 'the topic is polled' + and: 'the target topic is polled' def records = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)) then: 'poll returns one record' assert records.size() == 1 - and: 'record can be converted to AVC event' + and: 'target record can be converted to AVC event' def record = records.iterator().next() def cloudEvent = record.value() as CloudEvent def convertedAvcEvent = toTargetEvent(cloudEvent, AvcEvent.class) + and: 'the target event has the same key as the source event to maintain the ordering in a partition' + assert record.key() == consumerRecord.key() and: 'we have correct headers forwarded where correlation id matches' assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1' and: 'event id is same between consumed and forwarded' diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy index 00ce38fa2d..ffcba025e8 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy @@ -242,7 +242,7 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase { if (retryAttempts == 0) break } - consumerRecords + return consumerRecords } } diff --git a/k6-tests/once-off-test/kafka/produce-avc-event.js b/k6-tests/once-off-test/kafka/produce-avc-event.js index 981a21af65..db222f6a4a 100644 --- a/k6-tests/once-off-test/kafka/produce-avc-event.js +++ b/k6-tests/once-off-test/kafka/produce-avc-event.js @@ -50,6 +50,11 @@ export const options = { } }; +const getRandomNetworkElement = () => { + const networkElementIds = Array.from({ length: 10 }, (_, i) => `neType-${i + 1}`); + return networkElementIds[Math.floor(Math.random() * networkElementIds.length)]; +}; + function getCloudEventHeaders() { return { ce_type: 'org.onap.cps.ncmp.events.avc1_0_0.AvcEvent', @@ -65,10 +70,11 @@ function getCloudEventHeaders() { export function sendKafkaMessages() { const cloudEventHeaders = getCloudEventHeaders(); + const networkElementId = getRandomNetworkElement(); const avcCloudEvent = { key: schemaRegistry.serialize({ - data: cloudEventHeaders.ce_correlationid, + data: networkElementId, schemaType: SCHEMA_TYPE_STRING, }), value: schemaRegistry.serialize({ |