summaryrefslogtreecommitdiffstats
path: root/src/test/groovy/org/onap/cps/ncmp
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/groovy/org/onap/cps/ncmp')
-rw-r--r--src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy18
-rw-r--r--src/test/groovy/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfigSpec.groovy42
-rw-r--r--src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy8
-rw-r--r--src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy21
-rw-r--r--src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy3
5 files changed, 68 insertions, 24 deletions
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy
index e5f00f33..13dd043d 100644
--- a/src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy
+++ b/src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy
@@ -20,10 +20,12 @@
package org.onap.cps.ncmp.dmi.api.kafka
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventDeserializer
+import io.cloudevents.kafka.CloudEventSerializer
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
-import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.support.serializer.JsonSerializer
@@ -45,28 +47,30 @@ class MessagingBaseSpec extends Specification {
static kafkaTestContainer = new KafkaContainer(DockerImageName.parse('registry.nordix.org/onaptest/confluentinc/cp-kafka:6.2.1').asCompatibleSubstituteFor('confluentinc/cp-kafka'))
- def producerConfigProperties() {
+ def producerConfigProperties(valueSerializer) {
return [('bootstrap.servers'): kafkaTestContainer.getBootstrapServers().split(',')[0],
('retries') : 0,
('batch-size') : 16384,
('linger.ms') : 1,
('buffer.memory') : 33554432,
('key.serializer') : StringSerializer,
- ('value.serializer') : JsonSerializer]
+ ('value.serializer') : valueSerializer]
}
- def consumerConfigProperties(consumerGroupId) {
+ def consumerConfigProperties(consumerGroupId, valueDeserializer) {
return [('bootstrap.servers') : kafkaTestContainer.getBootstrapServers().split(',')[0],
('key.deserializer') : StringDeserializer,
- ('value.deserializer'): StringDeserializer,
+ ('value.deserializer'): valueDeserializer,
('auto.offset.reset') : 'earliest',
('group.id') : consumerGroupId
]
}
- def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties()))
+ def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties(JsonSerializer)))
+ def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-test-group', StringDeserializer))
- def consumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group'))
+ def cloudEventKafkaTemplate = new KafkaTemplate(new DefaultKafkaProducerFactory<String, CloudEvent>(producerConfigProperties(CloudEventSerializer)))
+ def cloudEventKafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-test-group', CloudEventDeserializer))
@DynamicPropertySource
static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfigSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfigSpec.groovy
new file mode 100644
index 00000000..f09434be
--- /dev/null
+++ b/src/test/groovy/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfigSpec.groovy
@@ -0,0 +1,42 @@
+package org.onap.cps.ncmp.dmi.config.kafka
+
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventDeserializer
+import io.cloudevents.kafka.CloudEventSerializer
+import org.spockframework.spring.EnableSharedInjection
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties
+import org.springframework.boot.context.properties.EnableConfigurationProperties
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.support.serializer.JsonDeserializer
+import org.springframework.kafka.support.serializer.JsonSerializer
+import spock.lang.Shared
+import spock.lang.Specification
+
+@SpringBootTest(classes = [KafkaProperties, KafkaConfig])
+@EnableSharedInjection
+@EnableConfigurationProperties
+class KafkaConfigSpec extends Specification {
+
+ @Shared
+ @Autowired
+ KafkaTemplate<String, String> legacyEventKafkaTemplate
+
+ @Shared
+ @Autowired
+ KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate
+
+ def 'Verify kafka template serializer and deserializer configuration for #eventType.'() {
+ expect: 'kafka template is instantiated'
+ assert kafkaTemplateInstance.properties['beanName'] == beanName
+ and: 'verify event key and value serializer'
+ assert kafkaTemplateInstance.properties['producerFactory'].configs['value.serializer'].asType(String.class).contains(valueSerializer.getCanonicalName())
+ and: 'verify event key and value deserializer'
+ assert kafkaTemplateInstance.properties['consumerFactory'].configs['spring.deserializer.value.delegate.class'].asType(String.class).contains(delegateDeserializer.getCanonicalName())
+ where: 'the following event type is used'
+ eventType | kafkaTemplateInstance || beanName | valueSerializer | delegateDeserializer
+ 'legacy event' | legacyEventKafkaTemplate || 'legacyEventKafkaTemplate' | JsonSerializer | JsonDeserializer
+ 'cloud event' | cloudEventKafkaTemplate || 'cloudEventKafkaTemplate' | CloudEventSerializer | CloudEventDeserializer
+ }
+}
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy
index 96e2c16d..7ca2d54c 100644
--- a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy
+++ b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy
@@ -49,18 +49,18 @@ class AsyncTaskExecutorIntegrationSpec extends MessagingBaseSpec {
def setup() {
cpsAsyncRequestResponseEventProducer.dmiNcmpTopic = TEST_TOPIC
- consumer.subscribe([TEST_TOPIC] as List<String>)
+ kafkaConsumer.subscribe([TEST_TOPIC] as List<String>)
}
def cleanup() {
- consumer.close()
+ kafkaConsumer.close()
}
def 'Publish and Subscribe message - success'() {
when: 'a successful event is published'
objectUnderTest.publishAsyncEvent(TEST_TOPIC, '12345','{}', 'OK', '200')
and: 'the topic is polled'
- def records = consumer.poll(Duration.ofMillis(1500))
+ def records = kafkaConsumer.poll(Duration.ofMillis(1500))
then: 'the record received is the event sent'
def record = records.iterator().next()
DmiAsyncRequestResponseEvent event = spiedObjectMapper.readValue(record.value(), DmiAsyncRequestResponseEvent)
@@ -74,7 +74,7 @@ class AsyncTaskExecutorIntegrationSpec extends MessagingBaseSpec {
def exception = new HttpClientRequestException('some cm handle', 'Node not found', HttpStatus.INTERNAL_SERVER_ERROR)
objectUnderTest.publishAsyncFailureEvent(TEST_TOPIC, '67890', exception)
and: 'the topic is polled'
- def records = consumer.poll(Duration.ofMillis(1500))
+ def records = kafkaConsumer.poll(Duration.ofMillis(1500))
then: 'the record received is the event sent'
def record = records.iterator().next()
DmiAsyncRequestResponseEvent event = spiedObjectMapper.readValue(record.value(), DmiAsyncRequestResponseEvent)
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy
index 5f7ed878..a7557bb9 100644
--- a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy
+++ b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy
@@ -21,12 +21,10 @@
package org.onap.cps.ncmp.dmi.notifications.avc
import com.fasterxml.jackson.databind.ObjectMapper
-import org.apache.kafka.clients.consumer.KafkaConsumer
+import io.cloudevents.core.CloudEventUtils
+import io.cloudevents.jackson.PojoCloudEventDataMapper
import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec
-import org.onap.cps.ncmp.dmi.notifications.async.AsyncTaskExecutor
-import org.onap.cps.ncmp.dmi.service.DmiService
-import org.onap.cps.ncmp.dmi.notifications.avc.DmiDataAvcEventSimulationController
-import org.onap.cps.ncmp.event.model.AvcEvent
+import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent
import org.spockframework.spring.SpringBean
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.test.annotation.DirtiesContext
@@ -40,9 +38,9 @@ import java.time.Duration
class AvcEventExecutorIntegrationSpec extends MessagingBaseSpec {
@SpringBean
- DmiDataAvcEventProducer dmiDataAvcEventProducer = new DmiDataAvcEventProducer(kafkaTemplate)
+ DmiDataAvcEventProducer dmiDataAvcEventProducer = new DmiDataAvcEventProducer(cloudEventKafkaTemplate)
- def dmiService = new DmiDataAvcEventSimulationController(dmiDataAvcEventProducer)
+ def dmiService = new DmiDataAvcEventSimulationController(dmiDataAvcEventProducer)
def objectMapper = new ObjectMapper()
@@ -50,13 +48,14 @@ class AvcEventExecutorIntegrationSpec extends MessagingBaseSpec {
given: 'a simulated event'
dmiService.simulateEvents(1)
and: 'a consumer subscribed to dmi-cm-events topic'
- def consumer = new KafkaConsumer<>(consumerConfigProperties('test'))
- consumer.subscribe(['dmi-cm-events'])
+ cloudEventKafkaConsumer.subscribe(['dmi-cm-events'])
when: 'the next event record is consumed'
- def record = consumer.poll(Duration.ofMillis(1500)).iterator().next()
+ def record = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).iterator().next()
then: 'record has correct topic'
assert record.topic == 'dmi-cm-events'
and: 'the record value can be mapped to an avcEvent'
- objectMapper.readValue(record.value(), AvcEvent)
+ def dmiDataAvcEvent = record.value()
+ def convertedAvcEvent = CloudEventUtils.mapData(dmiDataAvcEvent, PojoCloudEventDataMapper.from(objectMapper, AvcEvent.class)).getValue()
+ assert convertedAvcEvent != null
}
} \ No newline at end of file
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy
index 65567ef8..59873ecf 100644
--- a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy
+++ b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy
@@ -22,6 +22,7 @@ package org.onap.cps.ncmp.dmi.notifications.avc
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.StringDeserializer
import org.onap.cps.ncmp.dmi.TestUtils
import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec
import org.onap.cps.ncmp.dmi.service.model.SubscriptionEventResponse
@@ -39,8 +40,6 @@ import java.time.Duration
@DirtiesContext
class SubscriptionEventConsumerSpec extends MessagingBaseSpec {
- def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group'))
-
def objectMapper = new ObjectMapper()
def testTopic = 'dmi-ncmp-cm-avc-subscription'