diff options
author | mpriyank <priyank.maheshwari@est.tech> | 2023-05-04 11:24:29 +0100 |
---|---|---|
committer | mpriyank <priyank.maheshwari@est.tech> | 2023-05-10 13:42:12 +0100 |
commit | 87f0b004fb0b15f3e8fa30d39bdf8ae3310b8743 (patch) | |
tree | 0be61c2481fd560a01b3170f94631ff2791bee4c /cps-ncmp-service/src/test/groovy/org | |
parent | 492b6660fb153dd3dbf52c693a0b86bed3bee4f5 (diff) |
DMI Data AVC to use kafka headers
- POC done keeping AvcEvent schema in mind.
- Approach to have header schema per event schema.
- Moved the header information from AvcEvent to separate AvcEventHeader
schema.
- Added Jsr303 annotation support for required field check
Issue-ID: CPS-1671
Change-Id: I2e4f969e8ca4f6282d1b9aa5fd52d16174a26084
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
Diffstat (limited to 'cps-ncmp-service/src/test/groovy/org')
-rw-r--r-- | cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy | 39 |
1 files changed, 26 insertions, 13 deletions
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy index d57527a454..5f54bbe3dd 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy @@ -1,6 +1,6 @@ /* - * ============LICENSE_START======================================================= - * Copyright (c) 2023 Nordix Foundation. + * ============LICENSE_START======================================================= + * Copyright (c) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,17 +21,20 @@ package org.onap.cps.ncmp.api.impl.events.avc import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.header.internals.RecordHeader import org.mapstruct.factory.Mappers import org.onap.cps.ncmp.api.impl.events.EventsPublisher import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec -import org.onap.cps.ncmp.event.model.AvcEvent +import org.onap.cps.ncmp.events.avc.v1.AvcEvent import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.utils.JsonObjectMapper import org.spockframework.spring.SpringBean import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.test.annotation.DirtiesContext +import org.springframework.util.SerializationUtils import org.testcontainers.spock.Testcontainers import java.time.Duration @@ -63,23 +66,33 @@ class AvcEventConsumerSpec extends MessagingBaseSpec { and: 'an event is sent' def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json') def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class) + and: 'event has header information' + def consumerRecord = new ConsumerRecord<String,AvcEvent>(cmEventsTopicName,0, 0, 'sample-eventid', testEventSent) + consumerRecord.headers().add(new RecordHeader('eventId', SerializationUtils.serialize('sample-eventid'))) + consumerRecord.headers().add(new RecordHeader('eventCorrelationId', SerializationUtils.serialize('cmhandle1'))) when: 'the event is consumed' - acvEventConsumer.consumeAndForward(testEventSent) + acvEventConsumer.consumeAndForward(consumerRecord) and: 'the topic is polled' def records = kafkaConsumer.poll(Duration.ofMillis(1500)) then: 'poll returns one record' assert records.size() == 1 and: 'record can be converted to AVC event' def record = records.iterator().next() - def convertedAvcEvent = jsonObjectMapper.convertJsonString(record.value(), AvcEvent) - and: 'consumed forwarded NCMP event id differs from DMI event id' - assert testEventSent.eventId != convertedAvcEvent.getEventId() - and: 'correlation id matches' - assert testEventSent.eventCorrelationId == convertedAvcEvent.getEventCorrelationId() - and: 'timestamps match' - assert testEventSent.eventTime == convertedAvcEvent.getEventTime() - and: 'target matches' - assert testEventSent.eventSource == convertedAvcEvent.getEventSource() + def convertedAvcEvent = jsonObjectMapper.convertJsonString(record.value(), AvcEvent.class) + and: 'we have correct headers forwarded where correlation id matches' + record.headers().forEach(header -> { + if (header.key().equals('eventCorrelationId')) { + assert SerializationUtils.deserialize(header.value()) == 'cmhandle1' + } + }) + and: 'event id differs(as per requirement) between consumed and forwarded' + record.headers().forEach(header -> { + if (header.key().equals('eventId')) { + assert SerializationUtils.deserialize(header.value()) != 'sample-eventid' + } + }) + and: 'the event payload still matches' + assert testEventSent == convertedAvcEvent } }
\ No newline at end of file |