summaryrefslogtreecommitdiffstats
path: root/src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy')
-rw-r--r--src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy18
1 files changed, 11 insertions, 7 deletions
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy
index e5f00f33..13dd043d 100644
--- a/src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy
+++ b/src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy
@@ -20,10 +20,12 @@
package org.onap.cps.ncmp.dmi.api.kafka
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventDeserializer
+import io.cloudevents.kafka.CloudEventSerializer
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
-import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.support.serializer.JsonSerializer
@@ -45,28 +47,30 @@ class MessagingBaseSpec extends Specification {
static kafkaTestContainer = new KafkaContainer(DockerImageName.parse('registry.nordix.org/onaptest/confluentinc/cp-kafka:6.2.1').asCompatibleSubstituteFor('confluentinc/cp-kafka'))
- def producerConfigProperties() {
+ def producerConfigProperties(valueSerializer) {
return [('bootstrap.servers'): kafkaTestContainer.getBootstrapServers().split(',')[0],
('retries') : 0,
('batch-size') : 16384,
('linger.ms') : 1,
('buffer.memory') : 33554432,
('key.serializer') : StringSerializer,
- ('value.serializer') : JsonSerializer]
+ ('value.serializer') : valueSerializer]
}
- def consumerConfigProperties(consumerGroupId) {
+ def consumerConfigProperties(consumerGroupId, valueDeserializer) {
return [('bootstrap.servers') : kafkaTestContainer.getBootstrapServers().split(',')[0],
('key.deserializer') : StringDeserializer,
- ('value.deserializer'): StringDeserializer,
+ ('value.deserializer'): valueDeserializer,
('auto.offset.reset') : 'earliest',
('group.id') : consumerGroupId
]
}
- def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties()))
+ def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties(JsonSerializer)))
+ def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-test-group', StringDeserializer))
- def consumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group'))
+ def cloudEventKafkaTemplate = new KafkaTemplate(new DefaultKafkaProducerFactory<String, CloudEvent>(producerConfigProperties(CloudEventSerializer)))
+ def cloudEventKafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-test-group', CloudEventDeserializer))
@DynamicPropertySource
static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {