aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java8
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy11
-rw-r--r--integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy2
-rw-r--r--k6-tests/once-off-test/kafka/produce-avc-event.js8
4 files changed, 20 insertions, 9 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java
index 9e90eabbc4..2d1f64802b 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java
@@ -46,7 +46,8 @@ public class CmAvcEventConsumer {
private final EventsPublisher<CloudEvent> eventsPublisher;
/**
- * Incoming AvcEvent in the form of Consumer Record.
+ * Incoming Cm AvcEvent in the form of Consumer Record, it will be forwarded as is to a target topic.
+ * The key from incoming record will be used as key for the target topic as well to preserve the message ordering.
*
* @param cmAvcEventAsConsumerRecord Incoming raw consumer record
*/
@@ -55,7 +56,8 @@ public class CmAvcEventConsumer {
public void consumeAndForward(
final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) {
final CloudEvent outgoingAvcEvent = cmAvcEventAsConsumerRecord.value();
- log.debug("Consuming AVC event {} ...", outgoingAvcEvent);
- eventsPublisher.publishCloudEvent(cmEventsTopicName, outgoingAvcEvent.getId(), outgoingAvcEvent);
+ final String outgoingAvcEventKey = cmAvcEventAsConsumerRecord.key();
+ log.debug("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent);
+ eventsPublisher.publishCloudEvent(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent);
}
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy
index 06651be913..ad5f42ed94 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy
@@ -64,6 +64,7 @@ class CmAvcEventConsumerSpec extends MessagingBaseSpec {
cloudEventKafkaConsumer.subscribe([cmEventsTopicName] as List<String>)
and: 'an event is sent'
def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json')
+ def testEventKey = 'sample-eventid-key'
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class)
def testCloudEventSent = CloudEventBuilder.v1()
.withData(jsonObjectMapper.asJsonBytes(testEventSent))
@@ -72,17 +73,19 @@ class CmAvcEventConsumerSpec extends MessagingBaseSpec {
.withSource(URI.create('sample-test-source'))
.withExtension('correlationid', 'test-cmhandle1').build()
and: 'event has header information'
- def consumerRecord = new ConsumerRecord<String, CloudEvent>(cmEventsTopicName, 0, 0, 'sample-eventid', testCloudEventSent)
- when: 'the event is consumed'
+ def consumerRecord = new ConsumerRecord<String, CloudEvent>(cmEventsTopicName, 0, 0, testEventKey, testCloudEventSent)
+ when: 'the event is consumed and forwarded to target topic'
acvEventConsumer.consumeAndForward(consumerRecord)
- and: 'the topic is polled'
+ and: 'the target topic is polled'
def records = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))
then: 'poll returns one record'
assert records.size() == 1
- and: 'record can be converted to AVC event'
+ and: 'target record can be converted to AVC event'
def record = records.iterator().next()
def cloudEvent = record.value() as CloudEvent
def convertedAvcEvent = toTargetEvent(cloudEvent, AvcEvent.class)
+ and: 'the target event has the same key as the source event to maintain the ordering in a partition'
+ assert record.key() == consumerRecord.key()
and: 'we have correct headers forwarded where correlation id matches'
assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1'
and: 'event id is same between consumed and forwarded'
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy
index 00ce38fa2d..ffcba025e8 100644
--- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy
@@ -242,7 +242,7 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase {
if (retryAttempts == 0)
break
}
- consumerRecords
+ return consumerRecords
}
}
diff --git a/k6-tests/once-off-test/kafka/produce-avc-event.js b/k6-tests/once-off-test/kafka/produce-avc-event.js
index 981a21af65..db222f6a4a 100644
--- a/k6-tests/once-off-test/kafka/produce-avc-event.js
+++ b/k6-tests/once-off-test/kafka/produce-avc-event.js
@@ -50,6 +50,11 @@ export const options = {
}
};
+const getRandomNetworkElement = () => {
+ const networkElementIds = Array.from({ length: 10 }, (_, i) => `neType-${i + 1}`);
+ return networkElementIds[Math.floor(Math.random() * networkElementIds.length)];
+};
+
function getCloudEventHeaders() {
return {
ce_type: 'org.onap.cps.ncmp.events.avc1_0_0.AvcEvent',
@@ -65,10 +70,11 @@ function getCloudEventHeaders() {
export function sendKafkaMessages() {
const cloudEventHeaders = getCloudEventHeaders();
+ const networkElementId = getRandomNetworkElement();
const avcCloudEvent = {
key: schemaRegistry.serialize({
- data: cloudEventHeaders.ce_correlationid,
+ data: networkElementId,
schemaType: SCHEMA_TYPE_STRING,
}),
value: schemaRegistry.serialize({