diff options
author | mpriyank <priyank.maheshwari@est.tech> | 2023-06-20 13:42:31 +0100 |
---|---|---|
committer | mpriyank <priyank.maheshwari@est.tech> | 2023-06-30 12:29:19 +0100 |
commit | a9b8d9da4602727ca5810f62d250b6b941664b8c (patch) | |
tree | 0f70835cb2e47f209df34ce324a10a92f53e25e0 /src/test | |
parent | 1118bedbd3981c12aebcb9fa99e8744a9bf413c3 (diff) |
DMI Data AVC RFC8641 and CloudEvent Compliant
- Introduced CloudEvents for DMI Data AVC Events
- Kafkatemplate config to support legacy as well as CloudEvents
- AvcEvent to be compliant with RFC8641 schema format
- Updating the released version of CPS and NCMP 3.3.3
- Refactored the test code to handle the changes related to CloudEvents
Issue-ID: CPS-1719
Change-Id: I082bbceda6dc26c860e1eff977ede219296d1875
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
Diffstat (limited to 'src/test')
6 files changed, 77 insertions, 25 deletions
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy index e5f00f33..13dd043d 100644 --- a/src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy +++ b/src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy @@ -20,10 +20,12 @@ package org.onap.cps.ncmp.dmi.api.kafka +import io.cloudevents.CloudEvent +import io.cloudevents.kafka.CloudEventDeserializer +import io.cloudevents.kafka.CloudEventSerializer import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringSerializer -import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.support.serializer.JsonSerializer @@ -45,28 +47,30 @@ class MessagingBaseSpec extends Specification { static kafkaTestContainer = new KafkaContainer(DockerImageName.parse('registry.nordix.org/onaptest/confluentinc/cp-kafka:6.2.1').asCompatibleSubstituteFor('confluentinc/cp-kafka')) - def producerConfigProperties() { + def producerConfigProperties(valueSerializer) { return [('bootstrap.servers'): kafkaTestContainer.getBootstrapServers().split(',')[0], ('retries') : 0, ('batch-size') : 16384, ('linger.ms') : 1, ('buffer.memory') : 33554432, ('key.serializer') : StringSerializer, - ('value.serializer') : JsonSerializer] + ('value.serializer') : valueSerializer] } - def consumerConfigProperties(consumerGroupId) { + def consumerConfigProperties(consumerGroupId, valueDeserializer) { return [('bootstrap.servers') : kafkaTestContainer.getBootstrapServers().split(',')[0], ('key.deserializer') : StringDeserializer, - ('value.deserializer'): StringDeserializer, + ('value.deserializer'): valueDeserializer, ('auto.offset.reset') : 'earliest', ('group.id') : consumerGroupId ] } - def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties())) + def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties(JsonSerializer))) + def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-test-group', StringDeserializer)) - def consumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group')) + def cloudEventKafkaTemplate = new KafkaTemplate(new DefaultKafkaProducerFactory<String, CloudEvent>(producerConfigProperties(CloudEventSerializer))) + def cloudEventKafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-test-group', CloudEventDeserializer)) @DynamicPropertySource static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) { diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfigSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfigSpec.groovy new file mode 100644 index 00000000..f09434be --- /dev/null +++ b/src/test/groovy/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfigSpec.groovy @@ -0,0 +1,42 @@ +package org.onap.cps.ncmp.dmi.config.kafka + +import io.cloudevents.CloudEvent +import io.cloudevents.kafka.CloudEventDeserializer +import io.cloudevents.kafka.CloudEventSerializer +import org.spockframework.spring.EnableSharedInjection +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.autoconfigure.kafka.KafkaProperties +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.support.serializer.JsonDeserializer +import org.springframework.kafka.support.serializer.JsonSerializer +import spock.lang.Shared +import spock.lang.Specification + +@SpringBootTest(classes = [KafkaProperties, KafkaConfig]) +@EnableSharedInjection +@EnableConfigurationProperties +class KafkaConfigSpec extends Specification { + + @Shared + @Autowired + KafkaTemplate<String, String> legacyEventKafkaTemplate + + @Shared + @Autowired + KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate + + def 'Verify kafka template serializer and deserializer configuration for #eventType.'() { + expect: 'kafka template is instantiated' + assert kafkaTemplateInstance.properties['beanName'] == beanName + and: 'verify event key and value serializer' + assert kafkaTemplateInstance.properties['producerFactory'].configs['value.serializer'].asType(String.class).contains(valueSerializer.getCanonicalName()) + and: 'verify event key and value deserializer' + assert kafkaTemplateInstance.properties['consumerFactory'].configs['spring.deserializer.value.delegate.class'].asType(String.class).contains(delegateDeserializer.getCanonicalName()) + where: 'the following event type is used' + eventType | kafkaTemplateInstance || beanName | valueSerializer | delegateDeserializer + 'legacy event' | legacyEventKafkaTemplate || 'legacyEventKafkaTemplate' | JsonSerializer | JsonDeserializer + 'cloud event' | cloudEventKafkaTemplate || 'cloudEventKafkaTemplate' | CloudEventSerializer | CloudEventDeserializer + } +} diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy index 96e2c16d..7ca2d54c 100644 --- a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy +++ b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy @@ -49,18 +49,18 @@ class AsyncTaskExecutorIntegrationSpec extends MessagingBaseSpec { def setup() { cpsAsyncRequestResponseEventProducer.dmiNcmpTopic = TEST_TOPIC - consumer.subscribe([TEST_TOPIC] as List<String>) + kafkaConsumer.subscribe([TEST_TOPIC] as List<String>) } def cleanup() { - consumer.close() + kafkaConsumer.close() } def 'Publish and Subscribe message - success'() { when: 'a successful event is published' objectUnderTest.publishAsyncEvent(TEST_TOPIC, '12345','{}', 'OK', '200') and: 'the topic is polled' - def records = consumer.poll(Duration.ofMillis(1500)) + def records = kafkaConsumer.poll(Duration.ofMillis(1500)) then: 'the record received is the event sent' def record = records.iterator().next() DmiAsyncRequestResponseEvent event = spiedObjectMapper.readValue(record.value(), DmiAsyncRequestResponseEvent) @@ -74,7 +74,7 @@ class AsyncTaskExecutorIntegrationSpec extends MessagingBaseSpec { def exception = new HttpClientRequestException('some cm handle', 'Node not found', HttpStatus.INTERNAL_SERVER_ERROR) objectUnderTest.publishAsyncFailureEvent(TEST_TOPIC, '67890', exception) and: 'the topic is polled' - def records = consumer.poll(Duration.ofMillis(1500)) + def records = kafkaConsumer.poll(Duration.ofMillis(1500)) then: 'the record received is the event sent' def record = records.iterator().next() DmiAsyncRequestResponseEvent event = spiedObjectMapper.readValue(record.value(), DmiAsyncRequestResponseEvent) diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy index 5f7ed878..a7557bb9 100644 --- a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy +++ b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy @@ -21,12 +21,10 @@ package org.onap.cps.ncmp.dmi.notifications.avc import com.fasterxml.jackson.databind.ObjectMapper -import org.apache.kafka.clients.consumer.KafkaConsumer +import io.cloudevents.core.CloudEventUtils +import io.cloudevents.jackson.PojoCloudEventDataMapper import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec -import org.onap.cps.ncmp.dmi.notifications.async.AsyncTaskExecutor -import org.onap.cps.ncmp.dmi.service.DmiService -import org.onap.cps.ncmp.dmi.notifications.avc.DmiDataAvcEventSimulationController -import org.onap.cps.ncmp.event.model.AvcEvent +import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent import org.spockframework.spring.SpringBean import org.springframework.boot.test.context.SpringBootTest import org.springframework.test.annotation.DirtiesContext @@ -40,9 +38,9 @@ import java.time.Duration class AvcEventExecutorIntegrationSpec extends MessagingBaseSpec { @SpringBean - DmiDataAvcEventProducer dmiDataAvcEventProducer = new DmiDataAvcEventProducer(kafkaTemplate) + DmiDataAvcEventProducer dmiDataAvcEventProducer = new DmiDataAvcEventProducer(cloudEventKafkaTemplate) - def dmiService = new DmiDataAvcEventSimulationController(dmiDataAvcEventProducer) + def dmiService = new DmiDataAvcEventSimulationController(dmiDataAvcEventProducer) def objectMapper = new ObjectMapper() @@ -50,13 +48,14 @@ class AvcEventExecutorIntegrationSpec extends MessagingBaseSpec { given: 'a simulated event' dmiService.simulateEvents(1) and: 'a consumer subscribed to dmi-cm-events topic' - def consumer = new KafkaConsumer<>(consumerConfigProperties('test')) - consumer.subscribe(['dmi-cm-events']) + cloudEventKafkaConsumer.subscribe(['dmi-cm-events']) when: 'the next event record is consumed' - def record = consumer.poll(Duration.ofMillis(1500)).iterator().next() + def record = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).iterator().next() then: 'record has correct topic' assert record.topic == 'dmi-cm-events' and: 'the record value can be mapped to an avcEvent' - objectMapper.readValue(record.value(), AvcEvent) + def dmiDataAvcEvent = record.value() + def convertedAvcEvent = CloudEventUtils.mapData(dmiDataAvcEvent, PojoCloudEventDataMapper.from(objectMapper, AvcEvent.class)).getValue() + assert convertedAvcEvent != null } }
\ No newline at end of file diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy index 65567ef8..59873ecf 100644 --- a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy +++ b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy @@ -22,6 +22,7 @@ package org.onap.cps.ncmp.dmi.notifications.avc import com.fasterxml.jackson.databind.ObjectMapper import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.serialization.StringDeserializer import org.onap.cps.ncmp.dmi.TestUtils import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec import org.onap.cps.ncmp.dmi.service.model.SubscriptionEventResponse @@ -39,8 +40,6 @@ import java.time.Duration @DirtiesContext class SubscriptionEventConsumerSpec extends MessagingBaseSpec { - def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group')) - def objectMapper = new ObjectMapper() def testTopic = 'dmi-ncmp-cm-avc-subscription' diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml index d2aafbd7..43eb0fc5 100644 --- a/src/test/resources/application.yml +++ b/src/test/resources/application.yml @@ -59,8 +59,16 @@ spring: protocol: PLAINTEXT producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer - value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + value-serializer: io.cloudevents.kafka.CloudEventSerializer client-id: ncmp-dmi-plugin + consumer: + group-id: ${NCMP_CONSUMER_GROUP_ID:ncmp-group} + key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer + value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer + properties: + spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer + spring.deserializer.value.delegate.class: io.cloudevents.kafka.CloudEventDeserializer + spring.json.use.type.headers: false app: ncmp: |